diff --git a/python/paddle/distributed/auto_parallel/dist_loader.py b/python/paddle/distributed/auto_parallel/dist_loader.py index 5af0dd12f3ff9..5c0cd89306c90 100644 --- a/python/paddle/distributed/auto_parallel/dist_loader.py +++ b/python/paddle/distributed/auto_parallel/dist_loader.py @@ -17,16 +17,16 @@ import numpy as np import paddle -from paddle.fluid.dataloader.batch_sampler import ( +from paddle.io import BatchSampler, IterableDataset +from paddle.io.dataloader.batch_sampler import ( DistributedBatchSampler, _InfiniteIterableSampler, ) -from paddle.fluid.dataloader.dataloader_iter import ( +from paddle.io.dataloader.dataloader_iter import ( _DatasetKind, default_collate_fn, default_convert_fn, ) -from paddle.io import BatchSampler, IterableDataset class DistributedDataLoaderBase(metaclass=abc.ABCMeta): @@ -272,7 +272,7 @@ def __next__(self): return next(self.data) def _create_inner_dataloader(self): - dataloader = paddle.fluid.io.DataLoader( + dataloader = paddle.io.DataLoader( self.dataset, feed_list=self.feed_list, places=self.places, diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 605db8e932bf8..e0e102e2393f9 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -55,8 +55,6 @@ from . import reader from . import unique_name from .reader import * -from . import dataloader -from .dataloader import * from . import core from paddle.utils import deprecated from paddle.fluid.framework import static_only diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 4724077f1c9ec..ed294700b1621 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -40,14 +40,6 @@ _cleanup, _set_SIGCHLD_handler, ) -from .dataloader import BatchSampler, Dataset, IterableDataset, Subset -from .dataloader.dataloader_iter import ( - _DataLoaderIterSingleProcess, - _DataLoaderIterMultiProcess, - _DatasetKind, - default_collate_fn, -) -from .dataloader.batch_sampler import _InfiniteIterableSampler from .layers.io import ( monkey_patch_reader_methods, _copy_reader_var_, @@ -69,22 +61,12 @@ # NOTE: [ avoid hanging & failed quickly ] These value is used in getting data from another process QUEUE_GET_TIMEOUT = 60 -__all__ = ['PyReader', 'DataLoader', 'default_collate_fn'] +__all__ = ['PyReader', 'DataLoader'] data_loader_unique_name_generator = UniqueNameGenerator() KEEP_DATA_LOADER_ORDER = True USE_PINNED_MEMORY = None -# AutoTune Flags -USE_AUTOTUNE = False -TUNING_STEPS = 500 - - -def set_autotune_config(use_autotune, tuning_steps=500): - global USE_AUTOTUNE - USE_AUTOTUNE = use_autotune - global TUNING_STEPS - TUNING_STEPS = tuning_steps def keep_data_loader_order(*args): @@ -171,454 +153,7 @@ def _check_input_array(cls, item): return arr -class AuToTune: - def __init__(self, loader): - self.loader = loader - self.max_num_worker = multiprocessing.cpu_count() / 2 - - def __call__(self): - # use default loader - if (not USE_AUTOTUNE) or (not self.need_autotune()): - return self.loader.num_workers - - # get autotune loader - auto_tune_loader = self.get_autotune_loader() - if auto_tune_loader is None: - return self.loader.num_workers - - # pick the best num_workers - auto_tune_start = time.time() - logging.debug("========= DataLoader Auto Tune =========") - logging.debug( - "User config for DataLoader: " + str(self.loader.num_workers) - ) - best_num_workers = 0 - min_cost = float("inf") - logging.debug( - "Tuning Range for num_workers: 0 ~ " + str(self.max_num_worker) - ) - num_workers = 0 - while num_workers < self.max_num_worker: - auto_tune_loader.num_workers = num_workers - avg_cost = self.evaluate_reader_cost(auto_tune_loader) - if min_cost * 0.75 > avg_cost: - min_cost = avg_cost - best_num_workers = num_workers - else: - update_num = self.is_best( - auto_tune_loader, - best_num_workers, - min_cost, - self.max_num_worker, - ) - if update_num == best_num_workers: - break - else: - best_num_workers = update_num - logging.debug( - "num_workers: " - + str(num_workers) - + " avg_cost: " - + str(avg_cost) - ) - num_workers += 2 - logging.info( - "auto_tune dataLoader best_num_workers: " + str(best_num_workers) - ) - logging.debug( - "AutoTuning Cost for DataLoader: " - + str(time.time() - auto_tune_start) - + ' seconds' - ) - - # tune the default loader's num_workers - return best_num_workers - - def need_autotune(self): - if sys.platform == 'darwin' or sys.platform == 'win32': - return False - else: - return True - - def get_sub_dataset(self, dataset, batch_size): - num_samples = min(batch_size * TUNING_STEPS, len(dataset)) - sub_dataset = Subset(dataset, indices=list(range(num_samples))) - return sub_dataset - - def get_autotune_loader(self): - loader = copy.copy(self.loader) - batch_size = self.loader.batch_sampler.batch_size - if isinstance( - self.loader.batch_sampler, paddle.io.DistributedBatchSampler - ): - dataset = self.loader.batch_sampler.dataset - sub_dataset = self.get_sub_dataset(dataset, batch_size) - loader.batch_sampler = paddle.io.DistributedBatchSampler( - dataset=sub_dataset, - batch_size=batch_size, - num_replicas=self.loader.batch_sampler.nranks, - rank=self.loader.batch_sampler.local_rank, - shuffle=self.loader.batch_sampler.shuffle, - drop_last=self.loader.batch_sampler.drop_last, - ) - elif isinstance(self.loader.batch_sampler, paddle.io.BatchSampler): - dataset = self.loader.batch_sampler.sampler.data_source - sub_dataset = self.get_sub_dataset(dataset, batch_size) - loader.batch_sampler = paddle.io.BatchSampler( - dataset=sub_dataset, - batch_size=batch_size, - drop_last=self.loader.batch_sampler.drop_last, - ) - else: - loader = None - return loader - - def evaluate_reader_cost(self, reader): - costs = [] - avg_cost = 0 - start = time.time() - for i, data in enumerate(reader): - costs.append(time.time() - start) - start = time.time() - if len(costs) > 2: - avg_cost = sum(costs[2:]) / len(costs[2:]) - else: - avg_cost = sum(costs[0:]) / len(costs[0:]) - return avg_cost - - def is_best(self, reader, best_workers, best_time, num_work_boundary): - step = 0 - num_workers = best_workers + 1 - boundary = 1 - while num_workers < num_work_boundary and step < 5: - self.loader.num_workers = num_workers - time = self.evaluate_reader_cost(reader) - logging.debug( - "for back num_workers: " - + str(num_workers) - + " avg_cost: " - + str(time) - ) - step += 1 - if time < best_time * 0.70 * boundary: - return num_workers - else: - num_workers += 1 - boundary *= 0.80 - return best_workers - - class DataLoader: - """ - DataLoader prodives an iterator which iterates given dataset - once by the batch_sampler. - - DataLoader supports single-process and multi-prcess data loading, - multi-process workers will be used to load data asynchronously if - :attr:`num_workers` is set as a positive number. - - DataLoader supports map-style dataset and iterable-style dataset. - - For map-style datast(can get a sample from dataset with a given - index), please see :code:`paddle.io.Dataset`. - - For iterable-style datast(get samples from dataset iteratively, - like a Python iterator), please see :code:`paddle.io.IterableDataset`. - - For :code:`batch_sampler` please see :code:`paddle.io.BatchSampler` - - .. note:: - GPU tensor operation is not supported in subprocess currently, - please don't use GPU tensor operations in pipeline which will - be performed in subprocess, such as dataset transforms, collte_fn, - etc. Numpy array and CPU tensor operation is supported. - - **Disable automatic batching** - - In certain cases such as some NLP tasks, instead of automatic batching, - handling batching manually in dataset is needed by users. For these - cases, automatic batching is disabled if both :attr:`batch_size` and - :attr:`batch_sampler` is set as None, each data got from :attr:`dataset` - should be batched data and will be processed with function define by - :attr:`collate_fn` or :attr:`default_collate_fn`. - - - .. note:: - When automatic batching is disabled, :attr:`default_collate_fn` will - do nothing to data from dataset. - - - Args: - dataset(Dataset): the dataset to load data from, should be an - instance of subclass of :code:`paddle.io.Dataset` or - :code:`paddle.io.IterableDataset`. - feed_list (list(Tensor)|tuple(Tensor), optional): feed Tensor list. - The Tensors should be created by :code:`paddle.static.data()`. - :attr:`feed_list` must be set if :attr:`return_list` is - False. Default None. - places(list(Place)|tuple(Place)|list(str), optional): a list of Place, - to put data onto, :attr:`places` can be None, if - :attr:`places` is None, default place(CPUPlace or CUDAPlace(0)) - will be used. Default None. If ``places`` is list of string, - the string in the list can be ``cpu``, ``gpu:x`` and ``gpu_pinned``, - where ``x`` is the index of the GPUs. - return_list (bool, optional): whether the return value on each device is - presented as a list. If :attr:`return_list=False`, the return - value on each device would be a dict of str -> Tensor, where - the key of the dict is the name of each fed Tensors. If - :attr:`return_list=True`, the return value on each device would - be a list(Tensor). :attr:`return_list` can only be True - in dynamic graph mode. Default True. - batch_sampler(BatchSampler, optional): an instance of `paddle.io.BatchSampler` - to generate batch indices to draw samples from :attr:`dataset` - and combine a batch. Default None. - batch_size(int|None, optional): sample number in a mini-batch, a substitution - parameter for :attr:`batch_sampler`, if :attr:`batch_sampler` - is not set, a default `paddle.io.BatchSampler` will be used - and initialize by :attr:`batch_size`, :attr:`shuffle` and - :attr:`drop_last`. Default 1. - shuffle(bool, optional): whther to shuffle indices order before genrate - batch indices, a substitution parameter for :attr:`batch_sampler` - see :attr:`batch_size`. Default False. - drop_last(bool, optional): whether drop the last incomplete batch dataset size - is not divisible by the batch size, a substitution parameter - for :attr:`batch_sampler`, see :attr:`batch_size`. Default False - collate_fn(callable, optional): function to generate mini-batch data by merging - the sample list, None for only stack each fields of sample in axis - 0(same as :attr::`np.stack(..., axis=0)`). Default None - num_workers(int, optional): the number of subprocess to load data, 0 for no - subprocess used and loading data in main process. Default 0 - use_buffer_reader (bool, optional): whether to use bufferred reader. - If use_buffer_reader=True, the DataLoader would prefetch - batch data asynchronously, so it would speed up data feeding - and occupies a little more CPU or GPU memory, i.e., the memory - of one batch input data. Default True. - prefetch_factor (int, optional): Number of batch data the DataLoader would prefetch - if use_buffer_reader=True. Default 2. - use_shared_memory (bool, optional): whether to use shared memory to speed up - putting data into inter-process queue, set :attr:`use_shared_memory` - as True only when the shared memory space on your machine(e.g. - space of '/dev/shm' on Linux operating sysytem) is large enough. - Shared memory will only be enabled in multi-process mode(num_workers - > 0). Default True. - timeout(int, optional): the timeout value for getting data form output queue - of subprocesses. Default 0. - worker_init_fn(callable, optional): init function which will be called with - worker id on each subproces starting if not set as None. Default - None. - - Returns: - DataLoader: an iterable object for data iterating, each elemnet of the generated data is a Tensor. - - Examples: - - .. code-block:: python - - import numpy as np - - import paddle - import paddle.nn as nn - import paddle.nn.functional as F - from paddle.io import Dataset, BatchSampler, DataLoader - - BATCH_NUM = 20 - BATCH_SIZE = 16 - EPOCH_NUM = 4 - - IMAGE_SIZE = 784 - CLASS_NUM = 10 - - # define a random dataset - class RandomDataset(Dataset): - def __init__(self, num_samples): - self.num_samples = num_samples - - def __getitem__(self, idx): - image = np.random.random([IMAGE_SIZE]).astype('float32') - label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64') - return image, label - - def __len__(self): - return self.num_samples - - dataset = RandomDataset(BATCH_NUM * BATCH_SIZE) - - class SimpleNet(nn.Layer): - def __init__(self): - super().__init__() - self.fc = nn.Linear(IMAGE_SIZE, CLASS_NUM) - - def forward(self, image, label=None): - return self.fc(image) - - simple_net = SimpleNet() - opt = paddle.optimizer.SGD(learning_rate=1e-3, - parameters=simple_net.parameters()) - - loader = DataLoader(dataset, - batch_size=BATCH_SIZE, - shuffle=True, - drop_last=True, - num_workers=2) - - for e in range(EPOCH_NUM): - for i, (image, label) in enumerate(loader()): - out = simple_net(image) - loss = F.cross_entropy(out, label) - avg_loss = paddle.mean(loss) - avg_loss.backward() - opt.minimize(avg_loss) - simple_net.clear_gradients() - print("Epoch {} batch {}: loss = {}".format(e, i, np.mean(loss.numpy()))) - - - .. note:: - For reading iterable dataset with multiprocess Dataloader, - please see :code:`paddle.io.IterableDataset` - - """ - - def __init__( - self, - dataset, - feed_list=None, - places=None, - return_list=True, - batch_sampler=None, - batch_size=1, - shuffle=False, - drop_last=False, - collate_fn=None, - num_workers=0, - use_buffer_reader=True, - prefetch_factor=2, - use_shared_memory=True, - timeout=0, - worker_init_fn=None, - persistent_workers=False, - ): - self.return_list = return_list - self.collate_fn = collate_fn - self.use_buffer_reader = use_buffer_reader - self.prefetch_factor = prefetch_factor - self.worker_init_fn = worker_init_fn - - self.dataset = dataset - - if not return_list and not _non_static_mode(): - assert ( - feed_list is not None - ), "feed_list should be set when return_list=False" - self.feed_list = feed_list - - if places is None: - places = _current_expected_place() - if isinstance(places, (list, tuple)): - places = _get_paddle_place_list(places) - else: - places = _get_paddle_place(places) - self.places = _convert_places(places) - - assert num_workers >= 0, "num_workers should be a non-negative value" - if num_workers > 0 and ( - sys.platform == 'darwin' or sys.platform == 'win32' - ): - warnings.warn( - "DataLoader with multi-process mode is not supported on MacOs and Windows currently." - " Please use signle-process mode with num_workers = 0 instead" - ) - num_workers = 0 - self.num_workers = num_workers - - assert prefetch_factor > 0, "prefetch_factor should be a positive value" - - self.use_shared_memory = use_shared_memory - if use_shared_memory and num_workers == 0: - self.use_shared_memory = False - - assert timeout >= 0, "timeout should be a non-negative value" - self.timeout = timeout - - if isinstance(dataset, IterableDataset): - self.dataset_kind = _DatasetKind.ITER - if shuffle: - raise ValueError( - "IterableDataset not support shuffle, but got shuffle={}".format( - shuffle - ) - ) - if batch_sampler is not None: - raise ValueError( - "IterableDataset expect unspecified batch_sampler" - ) - else: - self.dataset_kind = _DatasetKind.MAP - - if batch_sampler is not None: - assert batch_size == 1 and not shuffle and not drop_last, ( - "batch_size/shuffle/drop_last should not be set when " - "batch_sampler is given" - ) - self.batch_sampler = batch_sampler - self.batch_size = None - elif batch_size is None: - self.batch_sampler = None - self.batch_size = None - else: - assert batch_size > 0, ( - "batch_size should be None or a positive value when " - "batch_sampler is not given" - ) - self.batch_size = batch_size - if isinstance(dataset, IterableDataset): - self.batch_sampler = _InfiniteIterableSampler( - dataset, batch_size - ) - else: - self.batch_sampler = BatchSampler( - dataset=dataset, - batch_size=batch_size, - shuffle=shuffle, - drop_last=drop_last, - ) - - self.drop_last = drop_last - self.auto_collate_batch = self.batch_sampler is not None - - self.pin_memory = False - if _non_static_mode(): - self.pin_memory = ( - True if use_pinned_memory() is None else use_pinned_memory() - ) - - self._persistent_workers = persistent_workers - self._iterator = None - self.num_workers = AuToTune(self).__call__() - - def __len__(self): - if self.dataset_kind == _DatasetKind.ITER: - raise ValueError("length of IterableDataset not supported") - else: - if self.auto_collate_batch: - return len(self.batch_sampler) - else: - return len(self.dataset) - - def __iter__(self): - if self.num_workers == 0: - return _DataLoaderIterSingleProcess(self) - elif self._persistent_workers: - if self._iterator is None: - self._iterator = _DataLoaderIterMultiProcess(self) - else: - self._iterator._reset() - return self._iterator - else: - return _DataLoaderIterMultiProcess(self) - - def __call__(self): - return self.__iter__() - @staticmethod def from_generator( feed_list=None, @@ -793,7 +328,7 @@ def set_data_source(loader, places): label = static.data(name='label', shape=[None, 1], dtype='int64') # Define DataLoader - loader = paddle.io.DataLoader.from_generator(feed_list=[image, label], capacity=16, iterable=ITERABLE) + loader = paddle.fluid.io.DataLoader.from_generator(feed_list=[image, label], capacity=16, iterable=ITERABLE) # Define network loss = simple_net(image, label) @@ -867,7 +402,7 @@ def forward(self, x): adam = opt.Adam(learning_rate=0.001, parameters=dp_layer.parameters()) # create data loader - loader = paddle.io.DataLoader.from_generator(capacity=5) + loader = paddle.fluid.io.DataLoader.from_generator(capacity=5) loader.set_batch_generator(random_batch_reader()) for epoch_id in range(EPOCH_NUM): @@ -944,7 +479,7 @@ def from_dataset(dataset, places, drop_last=True): use_var=[image, label]) dataset.set_filelist(['a.txt', 'b.txt', 'c.txt']) - loader = paddle.io.DataLoader.from_dataset(dataset, static.cpu_places()) + loader = paddle.fluid.io.DataLoader.from_dataset(dataset, static.cpu_places()) """ return DatasetLoader(dataset, places, drop_last) diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_save_for_auto_infer.py b/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_save_for_auto_infer.py index c74e2b7adaa22..a2a9c9113271b 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_save_for_auto_infer.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/dygraph_save_for_auto_infer.py @@ -37,8 +37,8 @@ ) from paddle.distributed.sharding.group_sharded import group_sharded_parallel from paddle.distributed.utils.log_utils import get_logger -from paddle.fluid.dataloader.dataset import IterableDataset from paddle.incubate.distributed.utils.io import save_for_auto_inference +from paddle.io import IterableDataset from paddle.nn import Linear logger = get_logger("INFO", __file__) diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dataset.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dataset.py index d7e09481a1c71..9e2b89b12860c 100755 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dataset.py @@ -406,7 +406,7 @@ def setUp(self): ] def test_main(self): - from paddle.fluid.dataloader.worker import _generate_states + from paddle.io.dataloader.worker import _generate_states for inp, outp in zip(self.inputs, self.outputs): out = _generate_states(*inp) diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py index 2b4d1a78d1ea3..bfd08f703c4f6 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py @@ -19,8 +19,8 @@ from paddle import fluid from paddle.fluid import core -from paddle.fluid.dataloader.dataloader_iter import _worker_loop from paddle.io import BatchSampler, DataLoader, Dataset, IterableDataset +from paddle.io.dataloader.worker import _worker_loop class RandomDataset(Dataset): diff --git a/python/paddle/incubate/autotune.py b/python/paddle/incubate/autotune.py index 742c5bded064a..dfad1dc58c928 100644 --- a/python/paddle/incubate/autotune.py +++ b/python/paddle/incubate/autotune.py @@ -84,7 +84,7 @@ def set_config(config=None): if config is None: core.enable_autotune() core.enable_layout_autotune() - paddle.fluid.reader.set_autotune_config(use_autotune=True) + paddle.io.reader.set_autotune_config(use_autotune=True) return config_dict = {} @@ -147,7 +147,7 @@ def set_config(config=None): ) if "tuning_steps" in dataloader_config: if isinstance(dataloader_config['tuning_steps'], int): - paddle.fluid.reader.set_autotune_config( + paddle.io.reader.set_autotune_config( use_autoune, dataloader_config['tuning_steps'] ) else: @@ -155,4 +155,4 @@ def set_config(config=None): "The auto-tuning configuration of the dataloader is incorrect." "The `tuning_steps` should be int. Use default parameter instead." ) - paddle.fluid.reader.set_autotune_config(use_autoune) + paddle.io.reader.set_autotune_config(use_autoune) diff --git a/python/paddle/io/__init__.py b/python/paddle/io/__init__.py index a9c0e9a2f2d2f..6c2e0dae67834 100755 --- a/python/paddle/io/__init__.py +++ b/python/paddle/io/__init__.py @@ -14,21 +14,21 @@ # TODO: define all functions about input & output in this directory -from ..fluid.io import DataLoader # noqa: F401 -from ..fluid.dataloader import Dataset # noqa: F401 -from ..fluid.dataloader import IterableDataset # noqa: F401 -from ..fluid.dataloader import BatchSampler # noqa: F401 -from ..fluid.dataloader import get_worker_info # noqa: F401 -from ..fluid.dataloader import TensorDataset # noqa: F401 -from ..fluid.dataloader import Sampler # noqa: F401 -from ..fluid.dataloader import SequenceSampler # noqa: F401 -from ..fluid.dataloader import RandomSampler # noqa: F401 -from ..fluid.dataloader import DistributedBatchSampler # noqa: F401 -from ..fluid.dataloader import ComposeDataset # noqa: F401 -from ..fluid.dataloader import ChainDataset # noqa: F401 -from ..fluid.dataloader import WeightedRandomSampler # noqa: F401 -from ..fluid.dataloader import Subset # noqa: F401 -from ..fluid.dataloader import random_split # noqa: F401 +from .reader import DataLoader # noqa: F401 +from .dataloader import Dataset # noqa: F401 +from .dataloader import IterableDataset # noqa: F401 +from .dataloader import BatchSampler # noqa: F401 +from .dataloader import get_worker_info # noqa: F401 +from .dataloader import TensorDataset # noqa: F401 +from .dataloader import Sampler # noqa: F401 +from .dataloader import SequenceSampler # noqa: F401 +from .dataloader import RandomSampler # noqa: F401 +from .dataloader import DistributedBatchSampler # noqa: F401 +from .dataloader import ComposeDataset # noqa: F401 +from .dataloader import ChainDataset # noqa: F401 +from .dataloader import WeightedRandomSampler # noqa: F401 +from .dataloader import Subset # noqa: F401 +from .dataloader import random_split # noqa: F401 __all__ = [ # noqa 'Dataset', diff --git a/python/paddle/fluid/dataloader/__init__.py b/python/paddle/io/dataloader/__init__.py similarity index 55% rename from python/paddle/fluid/dataloader/__init__.py rename to python/paddle/io/dataloader/__init__.py index c0b2052283b1c..bb65463f70afc 100644 --- a/python/paddle/fluid/dataloader/__init__.py +++ b/python/paddle/io/dataloader/__init__.py @@ -12,21 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from . import dataset -from .dataset import * +from .dataset import Dataset +from .dataset import IterableDataset +from .dataset import TensorDataset +from .dataset import ComposeDataset +from .dataset import ChainDataset +from .dataset import random_split +from .dataset import Subset -from . import batch_sampler -from .batch_sampler import * +from .batch_sampler import BatchSampler +from .batch_sampler import DistributedBatchSampler -from . import dataloader_iter -from .dataloader_iter import * +from .worker import get_worker_info -from . import sampler -from .sampler import * - -__all__ = ( - dataset.__all__ - + batch_sampler.__all__ - + dataloader_iter.__all__ - + sampler.__all__ -) +from .sampler import Sampler +from .sampler import SequenceSampler +from .sampler import RandomSampler +from .sampler import WeightedRandomSampler diff --git a/python/paddle/fluid/dataloader/batch_sampler.py b/python/paddle/io/dataloader/batch_sampler.py similarity index 98% rename from python/paddle/fluid/dataloader/batch_sampler.py rename to python/paddle/io/dataloader/batch_sampler.py index 3e0449719c4cd..190e9240900f8 100644 --- a/python/paddle/fluid/dataloader/batch_sampler.py +++ b/python/paddle/io/dataloader/batch_sampler.py @@ -12,13 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import numpy as np import math -from .sampler import Sampler, SequenceSampler, RandomSampler -from .dataset import Dataset, IterableDataset +import numpy as np -__all__ = ["BatchSampler", "DistributedBatchSampler"] +from .dataset import IterableDataset +from .sampler import RandomSampler, Sampler, SequenceSampler class BatchSampler(Sampler): diff --git a/python/paddle/fluid/dataloader/collate.py b/python/paddle/io/dataloader/collate.py similarity index 97% rename from python/paddle/fluid/dataloader/collate.py rename to python/paddle/io/dataloader/collate.py index dd70a3421409d..141624668f09b 100644 --- a/python/paddle/fluid/dataloader/collate.py +++ b/python/paddle/io/dataloader/collate.py @@ -12,13 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import numbers +from collections.abc import Mapping, Sequence + import numpy as np -from ..framework import _non_static_mode -from .. import core, layers -from collections.abc import Sequence, Mapping +import paddle + +from ...framework import core def default_collate_fn(batch): diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/io/dataloader/dataloader_iter.py similarity index 98% rename from python/paddle/fluid/dataloader/dataloader_iter.py rename to python/paddle/io/dataloader/dataloader_iter.py index 2b06c371ef36f..43b749c869dd6 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/io/dataloader/dataloader_iter.py @@ -12,51 +12,39 @@ # See the License for the specific language governing permissions and # limitations under the License. +import itertools +import logging import os +import queue import sys -import time -import signal -import numbers -import logging -import itertools import threading +import time import warnings -import numpy as np -from collections import namedtuple -from paddle.fluid.framework import ( - _set_expected_place, - _current_expected_place, - set_flags, -) -import queue +import numpy as np import paddle -import paddle.profiler as profiler +from paddle import profiler +from paddle.fluid.framework import _current_expected_place, _set_expected_place +from paddle.profiler.timer import benchmark from paddle.profiler.utils import in_profiler_mode -from .. import core, layers -from ..framework import in_dygraph_mode + +from ...framework import core, in_dygraph_mode from ..multiprocess_utils import ( - _set_SIGCHLD_handler, MP_STATUS_CHECK_INTERVAL, CleanupFuncRegistrar, + _set_SIGCHLD_handler, ) -from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher from .batch_sampler import _InfiniteIterableSampler from .collate import default_collate_fn, default_convert_fn +from .flat import _flatten_batch, _restore_batch from .worker import ( - ParentWatchDog, - get_worker_info, - _worker_loop, _DatasetKind, _IterableDatasetStopIteration, - _WorkerException, _ResumeIteration, + _worker_loop, + _WorkerException, ) -from .flat import _flatten_batch, _restore_batch -from paddle.profiler.timer import benchmark - -__all__ = ['get_worker_info'] # NOTE: fix `terminate called without an active exception` # if for loop break and program exit immediately(with no model @@ -95,7 +83,7 @@ class _DataLoaderIterBase: data by setting in given dataloader. Args: - loader(instance of DataLoader): instance of `fluid.io.DataLoader` + loader(instance of DataLoader): instance of `paddle.io.DataLoader` """ def __init__(self, loader): @@ -439,7 +427,7 @@ def __init__(self, loader): self._shutdown = False def _init_workers(self): - import paddle.incubate.multiprocessing as multiprocessing + from paddle.incubate import multiprocessing # multiprocess worker and indice queue list initial as empty self._workers = [] diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/io/dataloader/dataset.py similarity index 98% rename from python/paddle/fluid/dataloader/dataset.py rename to python/paddle/io/dataloader/dataset.py index 3701da0b33ec7..e8bb6bbd364c8 100755 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/io/dataloader/dataset.py @@ -13,17 +13,8 @@ # limitations under the License. import paddle -from .. import framework - -__all__ = [ - "Dataset", - "IterableDataset", - "TensorDataset", - "ComposeDataset", - "ChainDataset", - "random_split", - "Subset", -] + +from ... import framework class Dataset: diff --git a/python/paddle/fluid/dataloader/fetcher.py b/python/paddle/io/dataloader/fetcher.py similarity index 60% rename from python/paddle/fluid/dataloader/fetcher.py rename to python/paddle/io/dataloader/fetcher.py index b097a315c0c73..309d009cfc106 100644 --- a/python/paddle/fluid/dataloader/fetcher.py +++ b/python/paddle/io/dataloader/fetcher.py @@ -12,12 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -from ..log_helper import get_logger -from collections.abc import Sequence, Mapping - -_WARNING_TO_LOG = True - class _DatasetFetcher: def __init__(self, dataset, auto_collate_batch, collate_fn, drop_last): @@ -37,47 +31,8 @@ def __init__(self, dataset, auto_collate_batch, collate_fn, drop_last): # ecah sample processing in the batch def fetch(self, batch_indices, done_event=None): raise NotImplementedError( - "'fetch' not implement for class {}".format(self.__class__.__name__) - ) - - def _log_warning(self): - # only log warning on GPU 0 when distributed launch - from ...distributed import get_world_size, get_rank - - if get_world_size() >= 2 and get_rank() != 0: - return - - warn_str = ( - "Detect dataset only contains single fileds, return format " - "changed since Paddle 2.1. In Paddle <= 2.0, DataLoader add " - "a list surround output data(e.g. return [data]), and in " - "Paddle >= 2.1, DataLoader return the single filed directly " - "(e.g. return data). For example, in following code: \n\n" - ) - warn_str += ( - "import numpy as np\n" - "from paddle.io import DataLoader, Dataset\n\n" - "class RandomDataset(Dataset):\n" - " def __getitem__(self, idx):\n" - " data = np.random.random((2, 3)).astype('float32')\n\n" - " return data\n\n" - " def __len__(self):\n" - " return 10\n\n" - "dataset = RandomDataset()\n" - "loader = DataLoader(dataset, batch_size=1)\n" - "data = next(loader())\n\n" - ) - - warn_str += ( - "In Paddle <= 2.0, data is in format '[Tensor(shape=(1, 2, 3), " - "dtype=float32)]', and in Paddle >= 2.1, data is in format" - " 'Tensor(shape=(1, 2, 3), dtype=float32)'\n" - ) - - logger = get_logger( - "DataLoader", logging.INFO, fmt='%(levelname)s: %(message)s' + f"'fetch' not implement for class {self.__class__.__name__}" ) - logger.warning(warn_str) class _IterableDatasetFetcher(_DatasetFetcher): @@ -103,10 +58,6 @@ def fetch(self, batch_indices, done_event=None): ): raise StopIteration - global _WARNING_TO_LOG - if not isinstance(data[0], (Sequence, Mapping)) and _WARNING_TO_LOG: - self._log_warning() - _WARNING_TO_LOG = False else: data = next(self.dataset_iter) @@ -128,10 +79,6 @@ def fetch(self, batch_indices, done_event=None): else: return None - global _WARNING_TO_LOG - if not isinstance(data[0], (Sequence, Mapping)) and _WARNING_TO_LOG: - self._log_warning() - _WARNING_TO_LOG = False else: data = self.dataset[batch_indices] diff --git a/python/paddle/fluid/dataloader/flat.py b/python/paddle/io/dataloader/flat.py similarity index 93% rename from python/paddle/fluid/dataloader/flat.py rename to python/paddle/io/dataloader/flat.py index 1e1ed1eebd806..f674d7fb2b4b9 100644 --- a/python/paddle/fluid/dataloader/flat.py +++ b/python/paddle/io/dataloader/flat.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import numbers -import numpy as np +from collections.abc import Mapping, Sequence -from collections.abc import Sequence, Mapping +import numpy as np +import paddle FIELD_PREFIX = "_paddle_field_" @@ -38,7 +38,7 @@ def _flatten(batch, flat_batch, structure, field_idx): field, (np.ndarray, paddle.Tensor, paddle.fluid.core.eager.Tensor), ): - structure.append('{}{}'.format(FIELD_PREFIX, field_idx)) + structure.append(f'{FIELD_PREFIX}{field_idx}') flat_batch.append(field) field_idx += 1 elif isinstance(field, (str, bytes, numbers.Number)): @@ -61,7 +61,7 @@ def _flatten(batch, flat_batch, structure, field_idx): field, (np.ndarray, paddle.Tensor, paddle.fluid.core.eager.Tensor), ): - structure[k] = '{}{}'.format(FIELD_PREFIX, field_idx) + structure[k] = f'{FIELD_PREFIX}{field_idx}' flat_batch.append(field) field_idx += 1 elif isinstance(field, (str, bytes, numbers.Number)): @@ -79,7 +79,7 @@ def _flatten(batch, flat_batch, structure, field_idx): else: structure[k] = field else: - raise TypeError("wrong flat data type: {}".format(type(batch))) + raise TypeError(f"wrong flat data type: {type(batch)}") return structure, field_idx @@ -130,7 +130,7 @@ def _restore(structure, field_idx): elif isinstance(field, (Sequence, Mapping)): field_idx = _restore(structure[k], field_idx) else: - raise TypeError("wrong flat data type: {}".format(type(structure))) + raise TypeError(f"wrong flat data type: {type(structure)}") return field_idx @@ -145,7 +145,7 @@ def _restore(structure, field_idx): if isinstance(structure, (str, bytes)): assert structure == '{}{}'.format( FIELD_PREFIX, 0 - ), "invalid structure: {}".format(structure) + ), f"invalid structure: {structure}" return flat_batch[0] field_idx = _restore(structure, 0) assert field_idx + 1 == len(flat_batch), "Tensor parse incomplete" diff --git a/python/paddle/fluid/dataloader/sampler.py b/python/paddle/io/dataloader/sampler.py similarity index 98% rename from python/paddle/fluid/dataloader/sampler.py rename to python/paddle/io/dataloader/sampler.py index a6ec3ffbae9b8..aa8a4e649c76c 100644 --- a/python/paddle/fluid/dataloader/sampler.py +++ b/python/paddle/io/dataloader/sampler.py @@ -13,14 +13,8 @@ # limitations under the License. import numpy as np -from .. import core -__all__ = [ - "Sampler", - "SequenceSampler", - "RandomSampler", - "WeightedRandomSampler", -] +from ...framework import core class Sampler: @@ -317,7 +311,7 @@ def __iter__(self): idxs = _weighted_sample( self.weights, self.num_samples, self.replacement ) - return iter(idxs.reshape((-1)).tolist()) + return iter(idxs.reshape(-1).tolist()) def __len__(self): mul = np.prod(self.weights.shape) // self.weights.shape[-1] diff --git a/python/paddle/fluid/dataloader/worker.py b/python/paddle/io/dataloader/worker.py similarity index 98% rename from python/paddle/fluid/dataloader/worker.py rename to python/paddle/io/dataloader/worker.py index de6c382054e0a..4ca80e09ae65e 100644 --- a/python/paddle/fluid/dataloader/worker.py +++ b/python/paddle/io/dataloader/worker.py @@ -13,25 +13,25 @@ # limitations under the License. import os + +# NOTE: queue has a different name in python2 and python3 +import queue import sys -import paddle -import numpy as np import traceback -from collections import namedtuple -from .. import core -from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher + +import numpy as np + +import paddle + +from ...framework import core from ..multiprocess_utils import ( - _cleanup_mmap, - CleanupFuncRegistrar, MP_STATUS_CHECK_INTERVAL, + CleanupFuncRegistrar, + _cleanup_mmap, ) -from ..framework import _non_static_mode, _in_eager_without_dygraph_check +from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher from .flat import _flatten_batch -import queue - -__all__ = ['get_worker_info'] - class _IterableDatasetStopIteration: def __init__(self, worker_id): @@ -59,7 +59,7 @@ def create_fetcher( dataset, auto_collate_batch, collate_fn, drop_last ) else: - raise NotImplementedError("unknown Dataset kind {}".format(kind)) + raise NotImplementedError(f"unknown Dataset kind {kind}") class ParentWatchDog: @@ -291,9 +291,9 @@ def _worker_loop( # set different numpy seed for each worker try: - import numpy as np - import time import random + + import numpy as np except ImportError: pass else: diff --git a/python/paddle/io/multiprocess_utils.py b/python/paddle/io/multiprocess_utils.py new file mode 100644 index 0000000000000..5792983ceb475 --- /dev/null +++ b/python/paddle/io/multiprocess_utils.py @@ -0,0 +1,140 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import atexit + +# NOTE: queue has a different name in python2 and python3 +import queue +import signal +import sys + +from ..framework import core + +# multi-process worker check indices queue interval, avoid +# hanging in subprocess data loading +MP_STATUS_CHECK_INTERVAL = 5.0 + +# NOTE: [ mmap files clear ] If there is still data in the multiprocess queue when the main process finishes reading, +# the data in the queue needs to be popped. Then the LoDTensor read by the main process +# from the child process will automatically clear the memory-mapped file. +multiprocess_queue_set = set() + + +def _clear_multiprocess_queue_set(): + global multiprocess_queue_set + for data_queue in multiprocess_queue_set: + while True: + try: + data_queue.get_nowait() + except queue.Empty: + break + + +# NOTE: main process clear function at exit +def _cleanup(): + # NOTE: inter-process Queue shared memory objects clear function + _clear_multiprocess_queue_set() + # NOTE: main process memory map files clear funciton + core._cleanup_mmap_fds() + + +# NOTE: for child process clear function at exit +def _cleanup_mmap(): + # clear memory map files in child process + core._cleanup_mmap_fds() + + +# NOTE used for register a function to be executed at interpreter exit. +class CleanupFuncRegistrar: + # Record the cleanup functions that have been executed + _executed_func_set = set() + # Record the cleanup functions that have been registered + _registered_func_set = set() + + @classmethod + def register(cls, function, signals=[]): + def _func_exectuor(): + if function not in cls._executed_func_set: + try: + function() + finally: + cls._executed_func_set.add(function) + + def _func_register(function): + if not callable(function): + raise TypeError("%s is not callable object." % (function)) + # check function object whether hash-able {function} + if function not in cls._registered_func_set: + atexit.register(_func_exectuor) + cls._registered_func_set.add(function) + + def _signal_handler(signum=None, frame=None): + _func_exectuor() + if signum is not None: + if signum == signal.SIGINT: + raise KeyboardInterrupt + sys.exit(signum) + + def _signal_register(signals): + signals = set(signals) + for sig in signals: + orig_handler = signal.signal(sig, _signal_handler) + if orig_handler not in (signal.SIG_DFL, signal.SIG_IGN): + if ( + sig == signal.SIGINT + and orig_handler is signal.default_int_handler + ): + continue + if orig_handler not in cls._registered_func_set: + atexit.register(orig_handler) + cls._registered_func_set.add(orig_handler) + + # deal with signals + _signal_register(signals) + # deal with function + _func_register(function) + + +# NOTE: [ mmap files clear ] When the main process exits unexpectedly, the remaining +# shared memory objects in the inter-process Queue and the main process (mostly in the +# BlockingQueue) may not be completely released, resulting in the corresponding +# memory-mapped file remaining on the disk (/dev/shm), so register this function +# to clean up shared memory objects in these two queues before the python interpreter exits. +# NOTE: Currently multi-process DataLoader only supports Linux platform +if not (sys.platform == 'darwin' or sys.platform == 'win32'): + CleanupFuncRegistrar.register(_cleanup) + +# ------------ SIGCHLD handler setting -------------- +_SIGCHLD_handler_set = False + + +def _set_SIGCHLD_handler(): + global _SIGCHLD_handler_set + if _SIGCHLD_handler_set: + return + + current_handler = signal.getsignal(signal.SIGCHLD) + if not callable(current_handler): + current_handler = None + + def __handler__(signum, frame): + # NOTE: Here the signum is SIGCHLD, when the child process exits, + # this handler will be called whenever the child process exits + # normally or abnormally. + core._throw_error_if_process_failed() + if current_handler is not None: + current_handler(signum, frame) + + signal.signal(signal.SIGCHLD, __handler__) + _SIGCHLD_handler_set = True diff --git a/python/paddle/io/reader.py b/python/paddle/io/reader.py new file mode 100644 index 0000000000000..6698caa435f94 --- /dev/null +++ b/python/paddle/io/reader.py @@ -0,0 +1,528 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import multiprocessing + +# NOTE: queue has a different name in python2 and python3 +import sys +import time +import warnings + +import paddle +from paddle.fluid.framework import logging + +from ..fluid.framework import ( + _current_expected_place, + _get_paddle_place, + _get_paddle_place_list, + _non_static_mode, +) +from ..framework import core +from .dataloader import BatchSampler, IterableDataset, Subset +from .dataloader.batch_sampler import _InfiniteIterableSampler +from .dataloader.dataloader_iter import ( + _DataLoaderIterMultiProcess, + _DataLoaderIterSingleProcess, + _DatasetKind, +) + +# NOTE: [ avoid hanging & failed quickly ] +# These value is used in getting data from another process +QUEUE_GET_TIMEOUT = 60 + +USE_PINNED_MEMORY = None +# AutoTune Flags +USE_AUTOTUNE = False +TUNING_STEPS = 500 + + +def set_autotune_config(use_autotune, tuning_steps=500): + global USE_AUTOTUNE + USE_AUTOTUNE = use_autotune + global TUNING_STEPS + TUNING_STEPS = tuning_steps + + +def use_pinned_memory(*args): + global USE_PINNED_MEMORY + if len(args) == 0: + return USE_PINNED_MEMORY + else: + assert len(args) == 1 and isinstance(args[0], bool) + USE_PINNED_MEMORY = args[0] + + +def _convert_places(places): + if not isinstance(places, (list, tuple)): + places = [places] + + ret = [] + for p in places: + if not isinstance(p, core.Place): + tmp = core.Place() + tmp.set_place(p) + p = tmp + + ret.append(p) + return ret + + +class AuToTune: + def __init__(self, loader): + self.loader = loader + self.max_num_worker = multiprocessing.cpu_count() / 2 + + def __call__(self): + # use default loader + if (not USE_AUTOTUNE) or (not self.need_autotune()): + return self.loader.num_workers + + # get autotune loader + auto_tune_loader = self.get_autotune_loader() + if auto_tune_loader is None: + return self.loader.num_workers + + # pick the best num_workers + auto_tune_start = time.time() + logging.debug("========= DataLoader Auto Tune =========") + logging.debug( + "User config for DataLoader: " + str(self.loader.num_workers) + ) + best_num_workers = 0 + min_cost = float("inf") + logging.debug( + "Tuning Range for num_workers: 0 ~ " + str(self.max_num_worker) + ) + num_workers = 0 + while num_workers < self.max_num_worker: + auto_tune_loader.num_workers = num_workers + avg_cost = self.evaluate_reader_cost(auto_tune_loader) + if min_cost * 0.75 > avg_cost: + min_cost = avg_cost + best_num_workers = num_workers + else: + update_num = self.is_best( + auto_tune_loader, + best_num_workers, + min_cost, + self.max_num_worker, + ) + if update_num == best_num_workers: + break + else: + best_num_workers = update_num + logging.debug( + "num_workers: " + + str(num_workers) + + " avg_cost: " + + str(avg_cost) + ) + num_workers += 2 + logging.info( + "auto_tune dataLoader best_num_workers: " + str(best_num_workers) + ) + logging.debug( + "AutoTuning Cost for DataLoader: " + + str(time.time() - auto_tune_start) + + ' seconds' + ) + + # tune the default loader's num_workers + return best_num_workers + + def need_autotune(self): + if sys.platform == 'darwin' or sys.platform == 'win32': + return False + else: + return True + + def get_sub_dataset(self, dataset, batch_size): + num_samples = min(batch_size * TUNING_STEPS, len(dataset)) + sub_dataset = Subset(dataset, indices=list(range(num_samples))) + return sub_dataset + + def get_autotune_loader(self): + loader = copy.copy(self.loader) + batch_size = self.loader.batch_sampler.batch_size + if isinstance( + self.loader.batch_sampler, paddle.io.DistributedBatchSampler + ): + dataset = self.loader.batch_sampler.dataset + sub_dataset = self.get_sub_dataset(dataset, batch_size) + loader.batch_sampler = paddle.io.DistributedBatchSampler( + dataset=sub_dataset, + batch_size=batch_size, + num_replicas=self.loader.batch_sampler.nranks, + rank=self.loader.batch_sampler.local_rank, + shuffle=self.loader.batch_sampler.shuffle, + drop_last=self.loader.batch_sampler.drop_last, + ) + elif isinstance(self.loader.batch_sampler, paddle.io.BatchSampler): + dataset = self.loader.batch_sampler.sampler.data_source + sub_dataset = self.get_sub_dataset(dataset, batch_size) + loader.batch_sampler = paddle.io.BatchSampler( + dataset=sub_dataset, + batch_size=batch_size, + drop_last=self.loader.batch_sampler.drop_last, + ) + else: + loader = None + return loader + + def evaluate_reader_cost(self, reader): + costs = [] + avg_cost = 0 + start = time.time() + for i, data in enumerate(reader): + costs.append(time.time() - start) + start = time.time() + if len(costs) > 2: + avg_cost = sum(costs[2:]) / len(costs[2:]) + else: + avg_cost = sum(costs[0:]) / len(costs[0:]) + return avg_cost + + def is_best(self, reader, best_workers, best_time, num_work_boundary): + step = 0 + num_workers = best_workers + 1 + boundary = 1 + while num_workers < num_work_boundary and step < 5: + self.loader.num_workers = num_workers + time = self.evaluate_reader_cost(reader) + logging.debug( + "for back num_workers: " + + str(num_workers) + + " avg_cost: " + + str(time) + ) + step += 1 + if time < best_time * 0.70 * boundary: + return num_workers + else: + num_workers += 1 + boundary *= 0.80 + return best_workers + + +class DataLoader: + """ + DataLoader prodives an iterator which iterates given dataset + once by the batch_sampler. + + DataLoader supports single-process and multi-prcess data loading, + multi-process workers will be used to load data asynchronously if + :attr:`num_workers` is set as a positive number. + + DataLoader supports map-style dataset and iterable-style dataset. + + For map-style datast(can get a sample from dataset with a given + index), please see :code:`paddle.io.Dataset`. + + For iterable-style datast(get samples from dataset iteratively, + like a Python iterator), please see :code:`paddle.io.IterableDataset`. + + For :code:`batch_sampler` please see :code:`paddle.io.BatchSampler` + + .. note:: + GPU tensor operation is not supported in subprocess currently, + please don't use GPU tensor operations in pipeline which will + be performed in subprocess, such as dataset transforms, collte_fn, + etc. Numpy array and CPU tensor operation is supported. + + **Disable automatic batching** + + In certain cases such as some NLP tasks, instead of automatic batching, + handling batching manually in dataset is needed by users. For these + cases, automatic batching is disabled if both :attr:`batch_size` and + :attr:`batch_sampler` is set as None, each data got from :attr:`dataset` + should be batched data and will be processed with function define by + :attr:`collate_fn` or :attr:`default_collate_fn`. + + + .. note:: + When automatic batching is disabled, :attr:`default_collate_fn` will + do nothing to data from dataset. + + + Args: + dataset(Dataset): the dataset to load data from, should be an + instance of subclass of :code:`paddle.io.Dataset` or + :code:`paddle.io.IterableDataset`. + feed_list (list(Tensor)|tuple(Tensor), optional): feed Tensor list. + The Tensors should be created by :code:`paddle.static.data()`. + :attr:`feed_list` must be set if :attr:`return_list` is + False. Default None. + places(list(Place)|tuple(Place)|list(str), optional): a list of Place, + to put data onto, :attr:`places` can be None, if + :attr:`places` is None, default place(CPUPlace or CUDAPlace(0)) + will be used. Default None. If ``places`` is list of string, + the string in the list can be ``cpu``, ``gpu:x`` and ``gpu_pinned``, + where ``x`` is the index of the GPUs. + return_list (bool, optional): whether the return value on each device is + presented as a list. If :attr:`return_list=False`, the return + value on each device would be a dict of str -> Tensor, where + the key of the dict is the name of each fed Tensors. If + :attr:`return_list=True`, the return value on each device would + be a list(Tensor). :attr:`return_list` can only be True + in dynamic graph mode. Default True. + batch_sampler(BatchSampler, optional): an instance of `paddle.io.BatchSampler` + to generate batch indices to draw samples from :attr:`dataset` + and combine a batch. Default None. + batch_size(int|None, optional): sample number in a mini-batch, a substitution + parameter for :attr:`batch_sampler`, if :attr:`batch_sampler` + is not set, a default `paddle.io.BatchSampler` will be used + and initialize by :attr:`batch_size`, :attr:`shuffle` and + :attr:`drop_last`. Default 1. + shuffle(bool, optional): whther to shuffle indices order before genrate + batch indices, a substitution parameter for :attr:`batch_sampler` + see :attr:`batch_size`. Default False. + drop_last(bool, optional): whether drop the last incomplete batch dataset size + is not divisible by the batch size, a substitution parameter + for :attr:`batch_sampler`, see :attr:`batch_size`. Default False + collate_fn(callable, optional): function to generate mini-batch data by merging + the sample list, None for only stack each fields of sample in axis + 0(same as :attr::`np.stack(..., axis=0)`). Default None + num_workers(int, optional): the number of subprocess to load data, 0 for no + subprocess used and loading data in main process. Default 0 + use_buffer_reader (bool, optional): whether to use bufferred reader. + If use_buffer_reader=True, the DataLoader would prefetch + batch data asynchronously, so it would speed up data feeding + and occupies a little more CPU or GPU memory, i.e., the memory + of one batch input data. Default True. + prefetch_factor (int, optional): Number of batch data the DataLoader would prefetch + if use_buffer_reader=True. Default 2. + use_shared_memory (bool, optional): whether to use shared memory to speed up + putting data into inter-process queue, set :attr:`use_shared_memory` + as True only when the shared memory space on your machine(e.g. + space of '/dev/shm' on Linux operating sysytem) is large enough. + Shared memory will only be enabled in multi-process mode(num_workers + > 0). Default True. + timeout(int, optional): the timeout value for getting data form output queue + of subprocesses. Default 0. + worker_init_fn(callable, optional): init function which will be called with + worker id on each subproces starting if not set as None. Default + None. + + Returns: + DataLoader: an iterable object for data iterating, each elemnet of the generated data is a Tensor. + + Examples: + + .. code-block:: python + + import numpy as np + + import paddle + import paddle.nn as nn + import paddle.nn.functional as F + from paddle.io import Dataset, BatchSampler, DataLoader + + BATCH_NUM = 20 + BATCH_SIZE = 16 + EPOCH_NUM = 4 + + IMAGE_SIZE = 784 + CLASS_NUM = 10 + + # define a random dataset + class RandomDataset(Dataset): + def __init__(self, num_samples): + self.num_samples = num_samples + + def __getitem__(self, idx): + image = np.random.random([IMAGE_SIZE]).astype('float32') + label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64') + return image, label + + def __len__(self): + return self.num_samples + + dataset = RandomDataset(BATCH_NUM * BATCH_SIZE) + + class SimpleNet(nn.Layer): + def __init__(self): + super().__init__() + self.fc = nn.Linear(IMAGE_SIZE, CLASS_NUM) + + def forward(self, image, label=None): + return self.fc(image) + + simple_net = SimpleNet() + opt = paddle.optimizer.SGD(learning_rate=1e-3, + parameters=simple_net.parameters()) + + loader = DataLoader(dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + + for e in range(EPOCH_NUM): + for i, (image, label) in enumerate(loader()): + out = simple_net(image) + loss = F.cross_entropy(out, label) + avg_loss = paddle.mean(loss) + avg_loss.backward() + opt.minimize(avg_loss) + simple_net.clear_gradients() + print("Epoch {} batch {}: loss = {}".format(e, i, np.mean(loss.numpy()))) + + + .. note:: + For reading iterable dataset with multiprocess Dataloader, + please see :code:`paddle.io.IterableDataset` + + """ + + def __init__( + self, + dataset, + feed_list=None, + places=None, + return_list=True, + batch_sampler=None, + batch_size=1, + shuffle=False, + drop_last=False, + collate_fn=None, + num_workers=0, + use_buffer_reader=True, + prefetch_factor=2, + use_shared_memory=True, + timeout=0, + worker_init_fn=None, + persistent_workers=False, + ): + self.return_list = return_list + self.collate_fn = collate_fn + self.use_buffer_reader = use_buffer_reader + self.prefetch_factor = prefetch_factor + self.worker_init_fn = worker_init_fn + + self.dataset = dataset + + if not return_list and not _non_static_mode(): + assert ( + feed_list is not None + ), "feed_list should be set when return_list=False" + self.feed_list = feed_list + + if places is None: + places = _current_expected_place() + if isinstance(places, (list, tuple)): + places = _get_paddle_place_list(places) + else: + places = _get_paddle_place(places) + self.places = _convert_places(places) + + assert num_workers >= 0, "num_workers should be a non-negative value" + if num_workers > 0 and ( + sys.platform == 'darwin' or sys.platform == 'win32' + ): + warnings.warn( + "DataLoader with multi-process mode is not supported on MacOs and Windows currently." + " Please use signle-process mode with num_workers = 0 instead" + ) + num_workers = 0 + self.num_workers = num_workers + + assert prefetch_factor > 0, "prefetch_factor should be a positive value" + + self.use_shared_memory = use_shared_memory + if use_shared_memory and num_workers == 0: + self.use_shared_memory = False + + assert timeout >= 0, "timeout should be a non-negative value" + self.timeout = timeout + + if isinstance(dataset, IterableDataset): + self.dataset_kind = _DatasetKind.ITER + if shuffle: + raise ValueError( + "IterableDataset not support shuffle, but got shuffle={}".format( + shuffle + ) + ) + if batch_sampler is not None: + raise ValueError( + "IterableDataset expect unspecified batch_sampler" + ) + else: + self.dataset_kind = _DatasetKind.MAP + + if batch_sampler is not None: + assert batch_size == 1 and not shuffle and not drop_last, ( + "batch_size/shuffle/drop_last should not be set when " + "batch_sampler is given" + ) + self.batch_sampler = batch_sampler + self.batch_size = None + elif batch_size is None: + self.batch_sampler = None + self.batch_size = None + else: + assert batch_size > 0, ( + "batch_size should be None or a positive value when " + "batch_sampler is not given" + ) + self.batch_size = batch_size + if isinstance(dataset, IterableDataset): + self.batch_sampler = _InfiniteIterableSampler( + dataset, batch_size + ) + else: + self.batch_sampler = BatchSampler( + dataset=dataset, + batch_size=batch_size, + shuffle=shuffle, + drop_last=drop_last, + ) + + self.drop_last = drop_last + self.auto_collate_batch = self.batch_sampler is not None + + self.pin_memory = False + if _non_static_mode(): + self.pin_memory = ( + True if use_pinned_memory() is None else use_pinned_memory() + ) + + self._persistent_workers = persistent_workers + self._iterator = None + self.num_workers = AuToTune(self).__call__() + + def __len__(self): + if self.dataset_kind == _DatasetKind.ITER: + raise ValueError("length of IterableDataset not supported") + else: + if self.auto_collate_batch: + return len(self.batch_sampler) + else: + return len(self.dataset) + + def __iter__(self): + if self.num_workers == 0: + return _DataLoaderIterSingleProcess(self) + elif self._persistent_workers: + if self._iterator is None: + self._iterator = _DataLoaderIterMultiProcess(self) + else: + self._iterator._reset() + return self._iterator + else: + return _DataLoaderIterMultiProcess(self) + + def __call__(self): + return self.__iter__() diff --git a/python/paddle/static/quantization/post_training_quantization.py b/python/paddle/static/quantization/post_training_quantization.py index af3f35f977def..696d08cf7fda4 100644 --- a/python/paddle/static/quantization/post_training_quantization.py +++ b/python/paddle/static/quantization/post_training_quantization.py @@ -620,7 +620,7 @@ def _load_model_data(self): self._batch_nums if self._batch_nums else len(self._data_loader) ) return - self._data_loader = io.DataLoader.from_generator( + self._data_loader = reader.DataLoader.from_generator( feed_list=feed_vars, capacity=3 * self._batch_size, iterable=True ) if self._sample_generator is not None: diff --git a/python/setup.py.in b/python/setup.py.in index c87a5923f5c6a..5bb048ae2d9bf 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -445,7 +445,6 @@ packages=['paddle', 'paddle.fluid.proto', 'paddle.fluid.proto.profiler', 'paddle.fluid.layers', - 'paddle.fluid.dataloader', 'paddle.fluid.contrib', 'paddle.fluid.contrib.extend_optimizer', 'paddle.fluid.incubate', @@ -492,6 +491,7 @@ packages=['paddle', 'paddle.sparse.nn.functional', 'paddle.incubate.xpu', 'paddle.io', + 'paddle.io.dataloader', 'paddle.optimizer', 'paddle.nn', 'paddle.nn.functional', diff --git a/setup.py b/setup.py index 9e617b5e340fc..7de836bd2df29 100644 --- a/setup.py +++ b/setup.py @@ -1421,7 +1421,6 @@ def get_setup_parameters(): 'paddle.fluid.proto', 'paddle.fluid.proto.profiler', 'paddle.fluid.layers', - 'paddle.fluid.dataloader', 'paddle.fluid.contrib', 'paddle.fluid.contrib.extend_optimizer', 'paddle.fluid.incubate', @@ -1468,6 +1467,7 @@ def get_setup_parameters(): 'paddle.sparse.nn.functional', 'paddle.incubate.xpu', 'paddle.io', + 'paddle.io.dataloader', 'paddle.optimizer', 'paddle.nn', 'paddle.nn.functional', diff --git a/test/auto_parallel/auto_parallel_relaunch_model.py b/test/auto_parallel/auto_parallel_relaunch_model.py index 290af66485512..6fa3bc9eaa1ff 100644 --- a/test/auto_parallel/auto_parallel_relaunch_model.py +++ b/test/auto_parallel/auto_parallel_relaunch_model.py @@ -109,7 +109,7 @@ def mlp_pretrain_forward(train_program, start_program): error_cost = paddle.nn.functional.square_error_cost(predict, label) loss = paddle.mean(error_cost) - loader = paddle.io.DataLoader.from_generator( + loader = paddle.fluid.io.DataLoader.from_generator( feed_list=[input, label], capacity=4 * batch_size, iterable=True ) diff --git a/test/auto_parallel/engine_api.py b/test/auto_parallel/engine_api.py index cb0b4f2541e94..a2725a57b8e53 100644 --- a/test/auto_parallel/engine_api.py +++ b/test/auto_parallel/engine_api.py @@ -297,7 +297,7 @@ def train_builtin_data_vars(): with static.program_guard(engine.main_program, engine.startup_program): feed_list = engine.inputs + engine.labels print(feed_list) - loader = paddle.io.DataLoader.from_generator( + loader = paddle.fluid.io.DataLoader.from_generator( feed_list=feed_list, capacity=4 * batch_size, iterable=False ) @@ -324,7 +324,7 @@ def train_non_builtin_data_vars(): ) label = static.data(name="label", shape=[batch_size, 1], dtype='int64') - loader = paddle.io.DataLoader.from_generator( + loader = paddle.fluid.io.DataLoader.from_generator( feed_list=[input, label], capacity=4 * batch_size, iterable=False ) places = static.cuda_places() @@ -383,7 +383,7 @@ def get_cost(): ) label = static.data(name="label", shape=[batch_size, 1], dtype='int64') - loader = paddle.io.DataLoader.from_generator( + loader = paddle.fluid.io.DataLoader.from_generator( feed_list=[input, label], capacity=4 * batch_size, iterable=False ) places = static.cuda_places() @@ -434,7 +434,7 @@ def get_cost_by_default_program(): ) label = static.data(name="label", shape=[batch_size, 1], dtype='int64') - loader = paddle.io.DataLoader.from_generator( + loader = paddle.fluid.io.DataLoader.from_generator( feed_list=[input, label], capacity=4 * batch_size, iterable=False ) places = static.cuda_places() diff --git a/test/auto_parallel/test_dist_attr_v2.py b/test/auto_parallel/test_dist_attr_v2.py index 11c140a812a9f..1d15c34221f90 100644 --- a/test/auto_parallel/test_dist_attr_v2.py +++ b/test/auto_parallel/test_dist_attr_v2.py @@ -130,7 +130,7 @@ def get_program(): ) data_holder = [input, label] # dataloader - dataloader = paddle.io.DataLoader.from_generator( + dataloader = paddle.fluid.io.DataLoader.from_generator( feed_list=data_holder, capacity=4 * batch_size, iterable=False ) dataloader.set_batch_generator( diff --git a/test/auto_parallel/test_dist_context.py b/test/auto_parallel/test_dist_context.py index 10f78aedd4fb9..2944b2db2a3fb 100644 --- a/test/auto_parallel/test_dist_context.py +++ b/test/auto_parallel/test_dist_context.py @@ -112,7 +112,7 @@ def get_program(): ) data_holder = [input, label] # dataloader - dataloader = paddle.io.DataLoader.from_generator( + dataloader = paddle.fluid.io.DataLoader.from_generator( feed_list=data_holder, capacity=4 * batch_size, iterable=False ) dataloader.set_batch_generator( diff --git a/test/auto_parallel/test_serialization.py b/test/auto_parallel/test_serialization.py index 00a30e8a61d4e..d89c9596f4cdb 100644 --- a/test/auto_parallel/test_serialization.py +++ b/test/auto_parallel/test_serialization.py @@ -124,7 +124,7 @@ def get_program(): ) data_holder = [input, label] # dataloader - dataloader = paddle.io.DataLoader.from_generator( + dataloader = paddle.fluid.io.DataLoader.from_generator( feed_list=data_holder, capacity=4 * batch_size, iterable=False ) dataloader.set_batch_generator( diff --git a/test/auto_parallel/test_while_op_completion.py b/test/auto_parallel/test_while_op_completion.py index 6d5264ab971b7..3f9b5b151ab08 100644 --- a/test/auto_parallel/test_while_op_completion.py +++ b/test/auto_parallel/test_while_op_completion.py @@ -148,7 +148,7 @@ def get_program(): ) data_holder = [input, label] # dataloader - dataloader = paddle.io.DataLoader.from_generator( + dataloader = paddle.fluid.io.DataLoader.from_generator( feed_list=data_holder, capacity=4 * batch_size, iterable=False ) dataloader.set_batch_generator( diff --git a/test/auto_parallel/test_while_op_partition.py b/test/auto_parallel/test_while_op_partition.py index fdbcee8255a2b..95158df1f5f5f 100644 --- a/test/auto_parallel/test_while_op_partition.py +++ b/test/auto_parallel/test_while_op_partition.py @@ -136,7 +136,7 @@ def get_program(): data_holder = [input, label] # dataloader - dataloader = paddle.io.DataLoader.from_generator( + dataloader = fluid.io.DataLoader.from_generator( feed_list=data_holder, capacity=4 * batch_size, iterable=False ) dataloader.set_batch_generator( diff --git a/test/dygraph_to_static/test_resnet_v2.py b/test/dygraph_to_static/test_resnet_v2.py index bf332809ff8f0..2efbe46cedfec 100644 --- a/test/dygraph_to_static/test_resnet_v2.py +++ b/test/dygraph_to_static/test_resnet_v2.py @@ -255,7 +255,7 @@ def do_train(self, to_static): batch_size=batch_size, drop_last=True, ) - data_loader = paddle.io.DataLoader.from_generator( + data_loader = paddle.fluid.io.DataLoader.from_generator( capacity=5, iterable=True ) data_loader.set_sample_list_generator(train_reader) diff --git a/test/dygraph_to_static/test_simnet_v2.py b/test/dygraph_to_static/test_simnet_v2.py index 3e8cb4c10b3d4..a86259cc6d736 100644 --- a/test/dygraph_to_static/test_simnet_v2.py +++ b/test/dygraph_to_static/test_simnet_v2.py @@ -132,7 +132,7 @@ def train(conf_dict, to_static): global_step = 0 losses = [] - train_loader = paddle.io.DataLoader.from_generator( + train_loader = paddle.fluid.io.DataLoader.from_generator( capacity=16, return_list=True, iterable=True, use_double_buffer=True ) get_train_examples = simnet_process.get_reader("train", epoch=args.epoch) diff --git a/test/legacy_test/test_model.py b/test/legacy_test/test_model.py index af3cb7bdefb74..c5d62761f7f68 100644 --- a/test/legacy_test/test_model.py +++ b/test/legacy_test/test_model.py @@ -199,13 +199,13 @@ def setUpClass(cls): mode='test', return_label=False, sample_num=sp_num ) - cls.train_loader = fluid.io.DataLoader( + cls.train_loader = paddle.io.DataLoader( cls.train_dataset, places=cls.device, batch_size=64 ) - cls.val_loader = fluid.io.DataLoader( + cls.val_loader = paddle.io.DataLoader( cls.val_dataset, places=cls.device, batch_size=64 ) - cls.test_loader = fluid.io.DataLoader( + cls.test_loader = paddle.io.DataLoader( cls.test_dataset, places=cls.device, batch_size=64 ) @@ -322,14 +322,14 @@ def fit(self, dynamic, num_replicas=None, rank=None, num_iters=None): rank=rank, ) - train_loader = fluid.io.DataLoader( + train_loader = paddle.io.DataLoader( self.train_dataset, batch_sampler=train_sampler, places=self.device, return_list=True, ) - val_loader = fluid.io.DataLoader( + val_loader = paddle.io.DataLoader( self.val_dataset, batch_sampler=val_sampler, places=self.device, @@ -375,14 +375,14 @@ def fit_with_tuple_input(self, dynamic, num_replicas=None, rank=None): rank=rank, ) - train_loader = fluid.io.DataLoader( + train_loader = paddle.io.DataLoader( self.train_dataset, batch_sampler=train_sampler, places=self.device, return_list=True, ) - val_loader = fluid.io.DataLoader( + val_loader = paddle.io.DataLoader( self.val_dataset, batch_sampler=val_sampler, places=self.device, @@ -404,7 +404,7 @@ def evaluate(self, dynamic): self.val_dataset, batch_size=64, shuffle=False ) - val_loader = fluid.io.DataLoader( + val_loader = paddle.io.DataLoader( self.val_dataset, batch_sampler=sampler, places=self.device, @@ -432,7 +432,7 @@ def predict(self, dynamic): self.test_dataset, batch_size=64, shuffle=False ) - test_loader = fluid.io.DataLoader( + test_loader = paddle.io.DataLoader( self.test_dataset, batch_sampler=sampler, places=self.device,