From 3a41a449eafff1d50cb7877c4dfb57ce45edbc46 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Wed, 15 Jul 2020 14:54:30 +0000 Subject: [PATCH 01/17] add IterableDataset support in multiprocess DataLoader. test=develop --- .../paddle/fluid/dataloader/batch_sampler.py | 35 +++- .../fluid/dataloader/dataloader_iter.py | 59 +++++- python/paddle/fluid/dataloader/dataset.py | 55 +++++- python/paddle/fluid/dataloader/fetcher.py | 53 ++++++ python/paddle/fluid/reader.py | 31 +++- .../fluid/tests/unittests/CMakeLists.txt | 2 + ...ultiprocess_dataloader_iterable_dataset.py | 173 ++++++++++++++++++ python/paddle/io/__init__.py | 3 +- 8 files changed, 386 insertions(+), 25 deletions(-) create mode 100644 python/paddle/fluid/dataloader/fetcher.py create mode 100644 python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset.py diff --git a/python/paddle/fluid/dataloader/batch_sampler.py b/python/paddle/fluid/dataloader/batch_sampler.py index c6163f7da1ee6..685a7d15d486e 100644 --- a/python/paddle/fluid/dataloader/batch_sampler.py +++ b/python/paddle/fluid/dataloader/batch_sampler.py @@ -16,7 +16,7 @@ from __future__ import division import numpy as np -from .dataset import Dataset +from .dataset import Dataset, IterableDataset __all__ = ["BatchSampler"] @@ -107,11 +107,16 @@ def __init__(self, "indices should be a list or tuple, but got {}".format(type(indices)) self.indices = indices else: - assert isinstance(dataset, Dataset), \ - "dataset should be an instance of paddle.io.Dataset" - assert indices is None, \ - "should not set both dataset and indices" - self.indices = list(range(len(dataset))) + if isinstance(dataset, IterableDataset): + self.sampler_iter = iter( + _InfiniteIterableSampler(dataset, batch_size)) + else: + self.sampler_iter = None + assert isinstance(dataset, Dataset), \ + "dataset should be an instance of paddle.io.Dataset" + assert indices is None, \ + "should not set both dataset and indices" + self.indices = list(range(len(dataset))) assert isinstance(batch_size, int) and batch_size > 0, \ "batch_size should be a positive integer, but got {}".format(batch_size) @@ -124,6 +129,9 @@ def __init__(self, self.drop_last = drop_last def __iter__(self): + if self.sampler_iter: + yield next(self.sampler_iter) + if self.shuffle: np.random.shuffle(self.indices) _iter = iter(self.indices) @@ -138,6 +146,21 @@ def __iter__(self): yield batch_indices def __len__(self): + if self.sampler_iter: + raise RuntimeError("'{}' should not be called for IterableDataset". + format('__len__')) num_samples = len(self.indices) num_samples += int(not self.drop_last) * (self.batch_size - 1) return num_samples // self.batch_size + + +class _InfiniteIterableSampler(object): + def __init__(self, dataset, batch_size=1): + assert isinstance( + dataset, IterableDataset + ), "dataset should be an instnace of paddle.io.IterableDataset" + self.dataset = dataset + self.batch_size = batch_size + + def __iter__(self): + yield [None] * self.batch_size diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 6753c18da4649..445411bd47e43 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -22,6 +22,7 @@ import threading import numpy as np import multiprocessing +from collections import namedtuple # NOTE: queue has a different name in python2 and python3 if six.PY2: @@ -32,11 +33,15 @@ from .. import core from ..framework import in_dygraph_mode from ..multiprocess_utils import CleanupFuncRegistrar, _cleanup_mmap, _set_SIGCHLD_handler +from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher # multi-process worker check indices queue interval, avoid # hanging in subprocess data loading MP_INDICES_CHECK_INTERVAL = 5 +_IterableDatasetStopIteration = namedtuple('_IterableDatasetStopIteration', + ['worker_id']) + def _default_collate_fn(batch): sample = batch[0] @@ -55,6 +60,20 @@ def _default_collate_fn(batch): return [np.stack(slot, axis=0) for slot in slots] +class _DatasetKind(object): + MAP = 0 + ITER = 1 + + @staticmethod + def create_fetcher(kind, dataset, collate_fn, drop_last): + if kind == _DatasetKind.MAP: + return _MapDatasetFetcher(dataset, collate_fn, drop_last) + elif kind == _DatasetKind.ITER: + return _IterableDatasetFetcher(dataset, collate_fn, drop_last) + else: + raise NotImplementedError("unknown Dataset kind {}".format(kind)) + + class ParentWatchDog(object): def __init__(self): self._parent_pid = os.getppid() @@ -88,6 +107,7 @@ def __init__(self, loader): self._use_shared_memory = loader.use_shared_memory self._timeout = loader.timeout if loader.timeout > 0 else MP_INDICES_CHECK_INTERVAL self._worker_init_fn = loader.worker_init_fn + self._dataset_kind = loader.dataset_kind # LoDTensorBlockingQueue instance for create_py_reader and a thread # to put mini-batch data to self._blocking_queue, mini-batch data @@ -114,6 +134,9 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): def __init__(self, loader): super(_DataLoaderIterSingleProcess, self).__init__(loader) + self._dataset_fetcher = _DatasetKind.create_fetcher( + self._dataset_kind, self._dataset, self._collate_fn, True) + # NOTE: len(self._places) batch data compose as an output # iteration, set blocking_queue can cache 2 iteration datas # at most here @@ -143,10 +166,11 @@ def _init_thread(self): def _thread_loop(self): try: for indices in self._sampler_iter: - # read data from dataset in mini-batch - batch = [self._dataset[i] for i in indices] - if self._collate_fn is not None: - batch = self._collate_fn(batch) + # # read data from dataset in mini-batch + # batch = [self._dataset[i] for i in indices] + # if self._collate_fn is not None: + # batch = self._collate_fn(batch) + batch = self._dataset_fetcher.fetch(indices) # pack as LoDTensorArray array = core.LoDTensorArray() @@ -346,10 +370,13 @@ def _worker_loop(self, dataset, indices_queue, out_queue, done_event, if init_fn is not None: try: init_fn(worker_id) + fetcher = _DatasetKind.create_fetcher(dataset_kind, dataset, + collate_fn, True) except: init_exception = Exception("init_fn failed in worker {}: " \ "{}".format(worker_id, sys.exc_info())) + iterator_drained = False parent_watch_dog = ParentWatchDog() while parent_watch_dog.is_alive(): @@ -360,12 +387,12 @@ def _worker_loop(self, dataset, indices_queue, out_queue, done_event, # None as poison piil, so worker event should be set if data is None: - assert done_event.is_set( - ), "get None when worker done_event set" + assert done_event.is_set() or iterator_drained, \ + "get None when worker done_event set" break # If worker done event is set but get still get data in # indices_queue, remaining data should be get and skipped. - if done_event.is_set(): + if done_event.is_set() or iterator_drained: continue idx, indices = data @@ -374,10 +401,16 @@ def _worker_loop(self, dataset, indices_queue, out_queue, done_event, batch = init_exception init_exception = None else: - batch = [dataset[i] for i in indices] - if self._collate_fn is not None: - batch = self._collate_fn(batch) + # batch = [dataset[i] for i in indices] + # if self._collate_fn is not None: + # batch = self._collate_fn(batch) + batch = fetcher.fetch(indices) except Exception as e: + if isinstance( + e, + StopIteration) and dataset_kind == _DatasetKind.ITER: + out_queue.put(_IterableDatasetStopIteration(worker_id)) + iterator_drained = True out_queue.put((idx, e)) else: if self._use_shared_memory: @@ -471,6 +504,12 @@ def _get_data(self): "workers' result queue.".format(e)) six.reraise(*sys.exc_info()) else: + if self._dataset_kind == _DatasetKind.ITER and isinstance( + data, _IterableDatasetStopIteration): + self._shutdown_worker(data.worker_id) + self._try_put_indices() + continue + idx, batch = data if idx == self._rcvd_idx: return batch diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index b49ceaddefdef..6ea5f6c8ed85a 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -16,7 +16,7 @@ import paddle.dataset.common -__all__ = ["Dataset"] +__all__ = ["Dataset", "IterableDataset"] class Dataset(object): @@ -71,3 +71,56 @@ def __getitem__(self, idx): def __len__(self): raise NotImplementedError("'{}' not implement in class "\ "{}".format('__len__', self.__class__.__name__)) + + +class IterableDataset(Dataset): + """ + An abstract class to encapsulates methods and behaviors of iterable datasets. + + All datasets in iterable-style(can only get sample one by one sequentially, like + a python iterater) should be a subclass of `paddle.io.IterableDataset`. All subclasses should + implement following methods: + + :code:`__iter__`: yield sample sequentially. This method is required by + reading dataset sample in :code:`paddle.io.DataLoader`. + + see :code:`paddle.io.DataLoader`. + + Examples: + + .. code-block:: python + + import numpy as np + from paddle.io import Dataset + + # define a random dataset + class RandomDataset(Dataset): + def __init__(self, num_samples): + self.num_samples = num_samples + + def __iter__(self, idx): + for i in range(self.num_samples) + image = np.random.random([784]).astype('float32') + label = np.random.randint(0, 9, (1, )).astype('int64') + yield image, label + + dataset = RandomDataset(10) + for img, lbl in iter(dataset): + print(img, lbl) + + """ + + def __init__(self): + pass + + def __iter__(self): + raise NotImplementedError("'{}' not implement in class "\ + "{}".format('__iter__', self.__class__.__name__)) + + def __getitem__(self, idx): + raise RuntimeError("'{}' should not be called for IterableDataset" \ + "{}".format('__getitem__', self.__class__.__name__)) + + def __len__(self): + raise RuntimeError("'{}' should not be called for IterableDataset" \ + "{}".format('__len__', self.__class__.__name__)) diff --git a/python/paddle/fluid/dataloader/fetcher.py b/python/paddle/fluid/dataloader/fetcher.py new file mode 100644 index 0000000000000..001b8b931da23 --- /dev/null +++ b/python/paddle/fluid/dataloader/fetcher.py @@ -0,0 +1,53 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class _DatasetFetcher(object): + def __init__(self, dataset, collate_fn, drop_last): + self.dataset = dataset + self.collate_fn = collate_fn + self.drop_last = drop_last + + def fetch(self, batch_indices): + raise NotImplementedError("'fetch' not implement for class {}".format( + self.__class__.__name__)) + + +class _IterableDatasetFetcher(_DatasetFetcher): + def __init__(self, dataset, collate_fn, drop_last): + super(_IterableDatasetFetcher, self).__init__(dataset, collate_fn, + drop_last) + self.dataset_iter = iter(dataset) + + def fetch(self, batch_indices): + data = [] + for _ in batch_indices: + try: + data.append(next(self.dataset_iter)) + except StopIteration: + break + if len(data) == 0 or (self.drop_last and + len(data) < len(batch_indices)): + raise StopIteration + + return self.collate_fn(data) + + +class _MapDatasetFetcher(_DatasetFetcher): + def __init__(self, dataset, collate_fn, drop_last): + super(_MapDatasetFetcher, self).__init__(dataset, collate_fn, drop_last) + + def fetch(self, batch_indices): + data = [self.dataset[idx] for idx in batch_indices] + return self.collate_fn(data) diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index ebe16a8bbbc31..eed96200b5d2a 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -22,8 +22,9 @@ from .executor import global_scope from .data_feeder import DataFeeder, BatchedTensorProvider from .multiprocess_utils import multiprocess_queue_set, CleanupFuncRegistrar, _cleanup_mmap, _cleanup, _set_SIGCHLD_handler -from .dataloader import BatchSampler, Dataset -from .dataloader.dataloader_iter import _DataLoaderIterSingleProcess, _DataLoaderIterMultiProcess +from .dataloader import BatchSampler, Dataset, IterableDataset +from .dataloader.dataloader_iter import _DataLoaderIterSingleProcess, _DataLoaderIterMultiProcess, _DatasetKind +from .dataloader.batch_sampler import _InfiniteIterableSampler from .layers.io import monkey_patch_reader_methods, _copy_reader_var_, double_buffer from .unique_name import UniqueNameGenerator import logging @@ -339,6 +340,18 @@ def __init__(self, assert timeout >= 0, "timeout should be a non-negative value" self.timeout = timeout + if isinstance(dataset, IterableDataset): + self.dataset_kind = _DatasetKind.ITER + if shuffle: + raise ValueError( + "IterableDataset not support shuffle, but got shuffle={}". + format(shuffle)) + if batch_sampler is not None: + raise ValueError( + "IterableDataset expect unspecified batch_sample") + else: + self.dataset_kind = _DatasetKind.MAP + if batch_sampler is not None: assert isinstance(batch_sampler, BatchSampler), \ "batch_sampler should be None or subclass instance " \ @@ -351,11 +364,15 @@ def __init__(self, assert batch_size is not None and batch_size > 0, \ "batch_size should be a positive value when " \ "batch_sampler is not given" - self.batch_sampler = BatchSampler( - dataset=dataset, - batch_size=batch_size, - shuffle=shuffle, - drop_last=drop_last) + if isinstance(dataset, IterableDataset): + self.batch_sampler = _InfiniteIterableSampler(dataset, + batch_size) + else: + self.batch_sampler = BatchSampler( + dataset=dataset, + batch_size=batch_size, + shuffle=shuffle, + drop_last=drop_last) def __len__(self): return len(self.batch_sampler) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 90f7781b7a437..27712db412e30 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -249,6 +249,7 @@ if (APPLE OR WIN32) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_static) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dynamic) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception) + list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_iterable_dataset) endif() if(NOT WITH_GPU OR WIN32 OR APPLE) @@ -427,4 +428,5 @@ if(NOT WIN32 AND NOT APPLE) set_tests_properties(test_multiprocess_dataloader_static PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_dynamic PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") + set_tests_properties(test_multiprocess_dataloader_iterable_dataset PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") endif() diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset.py new file mode 100644 index 0000000000000..0983d8decfb10 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset.py @@ -0,0 +1,173 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import os +import sys +import six +import time +import unittest +import multiprocessing +import numpy as np + +import paddle.fluid as fluid +from paddle.io import IterableDataset, BatchSampler, DataLoader + +EPOCH_NUM = 3 +BATCH_SIZE = 8 +IMAGE_SIZE = 32 +SAMPLE_NUM = 100 +CLASS_NUM = 10 + + +class RandomDataset(IterableDataset): + def __init__(self, sample_num, class_num): + self.sample_num = sample_num + self.class_num = class_num + + def __iter__(self): + for i in range(self.sample_num): + np.random.seed(i) + image = np.random.random([IMAGE_SIZE]).astype('float32') + label = np.random.randint(0, self.class_num - 1, + (1, )).astype('int64') + yield image, label + + +def simple_fc_net_static(): + startup_prog = fluid.Program() + main_prog = fluid.Program() + startup_prog.random_seed = 1 + main_prog.random_seed = 1 + + with fluid.unique_name.guard(): + with fluid.program_guard(main_prog, startup_prog): + image = fluid.data( + name='image', shape=[None, IMAGE_SIZE], dtype='float32') + label = fluid.data(name='label', shape=[None, 1], dtype='int64') + hidden = image + param_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.8)) + bias_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.5)) + for hidden_size in [10, 20, 30]: + hidden = fluid.layers.fc(hidden, + size=hidden_size, + act='tanh', + param_attr=param_attr, + bias_attr=bias_attr) + + predict_label = fluid.layers.fc(hidden, + size=CLASS_NUM, + act='softmax', + param_attr=param_attr, + bias_attr=bias_attr) + loss = fluid.layers.reduce_mean( + fluid.layers.cross_entropy( + input=predict_label, label=label)) + + optimizer = fluid.optimizer.Adam() + optimizer.minimize(loss) + return startup_prog, main_prog, image, label, loss + + +def prepare_places(with_data_parallel, with_cpu=False, with_gpu=True): + places = [] + if with_cpu: + places.append([fluid.CPUPlace()]) + if with_data_parallel: + places.append([fluid.CPUPlace()] * 2) + + if with_gpu and fluid.core.is_compiled_with_cuda(): + tmp = fluid.cuda_places()[:2] + assert len(tmp) > 0, "no gpu detected" + if with_data_parallel: + places.append(tmp) + places.append([tmp[0]]) + return places + + +class TestStaticDataLoader(unittest.TestCase): + def run_main(self, num_workers, places): + scope = fluid.Scope() + with fluid.scope_guard(scope): + startup_prog, main_prog, image, label, loss = simple_fc_net_static() + + dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM) + dataloader = DataLoader( + dataset, + feed_list=[image, label], + places=places, + num_workers=num_workers, + batch_size=BATCH_SIZE, + drop_last=True) + # assert len(dataloader) == int(SAMPLE_NUM / BATCH_SIZE) + + exe = fluid.Executor(place=places[0]) + exe.run(startup_prog) + + prog = fluid.CompiledProgram(main_prog) + if len(places) > 1: + prog = prog.with_data_parallel( + loss_name=loss.name, places=places) + + step_list = [] + loss_list = [] + start_t = time.time() + for _ in six.moves.range(EPOCH_NUM): + step = 0 + for d in dataloader: + assert len(d) == len(places), "{} != {}".format( + len(d), len(places)) + for i, item in enumerate(d): + image = item['image'] + label = item['label'] + assert image.shape() == [BATCH_SIZE, IMAGE_SIZE] + assert label.shape() == [BATCH_SIZE, 1] + assert image._place()._equals(places[i]) + assert label._place()._equals(places[i]) + L, = exe.run(program=prog, + feed=d, + fetch_list=[loss], + use_program_cache=True) + loss_list.append(np.mean(L)) + step += 1 + step_list.append(step) + + end_t = time.time() + ret = { + "time": end_t - start_t, + "step": step_list, + "loss": np.array(loss_list) + } + print("time cost", ret['time'], 'step_list', ret['step']) + return ret + + def test_main(self): + for p in prepare_places(True): + results = [] + for num_workers in [0, 2]: + print(self.__class__.__name__, p, num_workers) + sys.stdout.flush() + ret = self.run_main(num_workers=num_workers, places=p) + results.append(ret) + diff = np.max( + np.abs(results[0]['loss'] - results[1]['loss']) / + np.abs(results[0]['loss'])) + self.assertLess(diff, 1e-2) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/io/__init__.py b/python/paddle/io/__init__.py index f9c083d2aeeee..28b6157870a32 100644 --- a/python/paddle/io/__init__.py +++ b/python/paddle/io/__init__.py @@ -15,6 +15,7 @@ # TODO: define all functions about input & output in this directory __all__ = [ 'Dataset', + 'IterableDataset', 'BatchSampler', # 'Transform', 'DataLoader', @@ -36,7 +37,7 @@ ] from ..fluid.io import DataLoader -from ..fluid.dataloader import Dataset, BatchSampler +from ..fluid.dataloader import Dataset, IterableDataset, BatchSampler from ..fluid.io import load, save, load_program_state, set_program_state, \ load_inference_model, save_inference_model, batch from ..reader import shuffle, buffered, cache, chain, firstn, compose, map_readers, xmap_readers From 7575a0f2a057174009394840375151c8dc6e402d Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Wed, 15 Jul 2020 16:06:45 +0000 Subject: [PATCH 02/17] fix single process exit. test=develop --- .../fluid/dataloader/dataloader_iter.py | 16 ++- ...ess_dataloader_iterable_dataset_dynamic.py | 126 ++++++++++++++++++ ...ess_dataloader_iterable_dataset_static.py} | 3 +- 3 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_dynamic.py rename python/paddle/fluid/tests/unittests/{test_multiprocess_dataloader_iterable_dataset.py => test_multiprocess_dataloader_iterable_dataset_static.py} (98%) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 445411bd47e43..cec4d1f8d5361 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -188,6 +188,8 @@ def _thread_loop(self): self._blocking_queue.close() self._thread = None + except StopIteration: + self._blocking_queue.close() except Exception: self._blocking_queue.kill() self._thread = None @@ -380,13 +382,20 @@ def _worker_loop(self, dataset, indices_queue, out_queue, done_event, parent_watch_dog = ParentWatchDog() while parent_watch_dog.is_alive(): + print("enterrrrrrrrr") + import sys + sys.stdout.flush() try: data = indices_queue.get(MP_INDICES_CHECK_INTERVAL) except queue.Empty: continue + print("worker data", data) + sys.stdout.flush() + # None as poison piil, so worker event should be set if data is None: + print("worker {} get None to exit".format(worker_id)) assert done_event.is_set() or iterator_drained, \ "get None when worker done_event set" break @@ -404,14 +413,19 @@ def _worker_loop(self, dataset, indices_queue, out_queue, done_event, # batch = [dataset[i] for i in indices] # if self._collate_fn is not None: # batch = self._collate_fn(batch) + print("indices", indices) + sys.stdout.flush() batch = fetcher.fetch(indices) + print("batch", batch) + sys.stdout.flush() except Exception as e: if isinstance( e, StopIteration) and dataset_kind == _DatasetKind.ITER: out_queue.put(_IterableDatasetStopIteration(worker_id)) iterator_drained = True - out_queue.put((idx, e)) + else: + out_queue.put((idx, e)) else: if self._use_shared_memory: tensor_list = core._convert_to_tensor_list(batch) diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_dynamic.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_dynamic.py new file mode 100644 index 0000000000000..de71afed6654e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_dynamic.py @@ -0,0 +1,126 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import os +import sys +import six +import time +import unittest +import multiprocessing +import numpy as np + +import paddle.fluid as fluid +from paddle.io import Dataset, BatchSampler, DataLoader +from paddle.fluid.dygraph.nn import Linear +from paddle.fluid.dygraph.base import to_variable + +from test_multiprocess_dataloader_static import RandomDataset, prepare_places +from test_multiprocess_dataloader_static import EPOCH_NUM, BATCH_SIZE, IMAGE_SIZE, SAMPLE_NUM, CLASS_NUM + + +class SimpleFCNet(fluid.dygraph.Layer): + def __init__(self): + super(SimpleFCNet, self).__init__() + + param_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.8)) + bias_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant( + value=0.5)) + self._fcs = [] + in_channel = IMAGE_SIZE + for hidden_size in [10, 20, 30]: + self._fcs.append( + Linear( + in_channel, + hidden_size, + act='tanh', + param_attr=param_attr, + bias_attr=bias_attr)) + in_channel = hidden_size + self._fcs.append( + Linear( + in_channel, + CLASS_NUM, + act='softmax', + param_attr=param_attr, + bias_attr=bias_attr)) + + def forward(self, image): + out = image + for fc in self._fcs: + out = fc(out) + return out + + +class TestDygraphDataLoader(unittest.TestCase): + def run_main(self, num_workers, places): + fluid.default_startup_program().random_seed = 1 + fluid.default_main_program().random_seed = 1 + with fluid.dygraph.guard(places[0]): + fc_net = SimpleFCNet() + optimizer = fluid.optimizer.Adam(parameter_list=fc_net.parameters()) + + dataset = RandomDataset(SAMPLE_NUM, CLASS_NUM) + dataloader = DataLoader( + dataset, + places=places, + num_workers=num_workers, + batch_size=BATCH_SIZE, + drop_last=True) + + step_list = [] + loss_list = [] + start_t = time.time() + for _ in six.moves.range(EPOCH_NUM): + step = 0 + for image, label in dataloader(): + out = fc_net(image) + loss = fluid.layers.cross_entropy(out, label) + avg_loss = fluid.layers.reduce_mean(loss) + avg_loss.backward() + optimizer.minimize(avg_loss) + fc_net.clear_gradients() + + loss_list.append(np.mean(avg_loss.numpy())) + step += 1 + step_list.append(step) + + end_t = time.time() + ret = { + "time": end_t - start_t, + "step": step_list, + "loss": np.array(loss_list) + } + print("time cost", ret['time'], 'step_list', ret['step']) + return ret + + def test_main(self): + # dynamic graph do not run with_data_parallel + for p in prepare_places(False): + results = [] + for num_workers in [0, 2]: + print(self.__class__.__name__, p, num_workers) + sys.stdout.flush() + ret = self.run_main(num_workers=num_workers, places=p) + results.append(ret) + diff = np.max( + np.abs(results[0]['loss'] - results[1]['loss']) / + np.abs(results[0]['loss'])) + self.assertLess(diff, 1e-2) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py similarity index 98% rename from python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset.py rename to python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py index 0983d8decfb10..3cf7f804ca61f 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py @@ -126,7 +126,8 @@ def run_main(self, num_workers, places): step_list = [] loss_list = [] start_t = time.time() - for _ in six.moves.range(EPOCH_NUM): + for i in six.moves.range(EPOCH_NUM): + print("epoch ", i, "enterxxxxxxxxxx") step = 0 for d in dataloader: assert len(d) == len(places), "{} != {}".format( From f3da524322ab5b56fd07f6b5ab93217a84655c25 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Wed, 22 Jul 2020 14:36:40 +0000 Subject: [PATCH 03/17] epoch end success. test=develop --- .../paddle/fluid/dataloader/batch_sampler.py | 3 +- .../fluid/dataloader/dataloader_iter.py | 53 +++++++++---------- .../fluid/tests/unittests/CMakeLists.txt | 3 +- ...ess_dataloader_iterable_dataset_dynamic.py | 10 ++-- ...cess_dataloader_iterable_dataset_static.py | 7 +-- 5 files changed, 35 insertions(+), 41 deletions(-) diff --git a/python/paddle/fluid/dataloader/batch_sampler.py b/python/paddle/fluid/dataloader/batch_sampler.py index 685a7d15d486e..5347122a387b0 100644 --- a/python/paddle/fluid/dataloader/batch_sampler.py +++ b/python/paddle/fluid/dataloader/batch_sampler.py @@ -163,4 +163,5 @@ def __init__(self, dataset, batch_size=1): self.batch_size = batch_size def __iter__(self): - yield [None] * self.batch_size + while True: + yield [None] * self.batch_size diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index cec4d1f8d5361..50ff00d7b5be7 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -252,14 +252,14 @@ def __init__(self, loader): self._outstanding_capacity = 2 * max(self._num_workers, len(self._places)) + # init workers and indices queues and put 2 indices in each indices queue self._init_workers() - self._init_thread() - - self._shutdown = False - for _ in range(self._outstanding_capacity): self._try_put_indices() + self._init_thread() + self._shutdown = False + def _init_workers(self): # multiprocess worker and indice queue list initial as empty self._workers = [] @@ -280,9 +280,9 @@ def _init_workers(self): self._indices_queues.append(indices_queue) worker = multiprocessing.Process( target=self._worker_loop, - args=(self._dataset, indices_queue, self._data_queue, - self._workers_done_event, self._collate_fn, - self._worker_init_fn, i)) + args=(self._dataset, self._dataset_kind, indices_queue, + self._data_queue, self._workers_done_event, + self._collate_fn, self._worker_init_fn, i)) worker.daemon = True worker.start() self._workers.append(worker) @@ -356,8 +356,8 @@ def _exit_thread_unexpectedly(self): self._blocking_queue.kill() logging.error("DataLoader reader thread raised an exception!") - def _worker_loop(self, dataset, indices_queue, out_queue, done_event, - collate_fn, init_fn, worker_id): + def _worker_loop(self, dataset, dataset_kind, indices_queue, out_queue, + done_event, collate_fn, init_fn, worker_id): try: # NOTE: [ mmap files clear ] When the child process exits unexpectedly, # some shared memory objects may have been applied for but have not yet @@ -369,33 +369,26 @@ def _worker_loop(self, dataset, indices_queue, out_queue, done_event, core._set_process_signal_handler() init_exception = None - if init_fn is not None: - try: + try: + if init_fn is not None: init_fn(worker_id) - fetcher = _DatasetKind.create_fetcher(dataset_kind, dataset, - collate_fn, True) - except: - init_exception = Exception("init_fn failed in worker {}: " \ - "{}".format(worker_id, sys.exc_info())) + fetcher = _DatasetKind.create_fetcher(dataset_kind, dataset, + collate_fn, True) + except: + init_exception = Exception("init_fn failed in worker {}: " \ + "{}".format(worker_id, sys.exc_info())) iterator_drained = False parent_watch_dog = ParentWatchDog() while parent_watch_dog.is_alive(): - print("enterrrrrrrrr") - import sys - sys.stdout.flush() try: data = indices_queue.get(MP_INDICES_CHECK_INTERVAL) except queue.Empty: continue - print("worker data", data) - sys.stdout.flush() - # None as poison piil, so worker event should be set if data is None: - print("worker {} get None to exit".format(worker_id)) assert done_event.is_set() or iterator_drained, \ "get None when worker done_event set" break @@ -413,11 +406,7 @@ def _worker_loop(self, dataset, indices_queue, out_queue, done_event, # batch = [dataset[i] for i in indices] # if self._collate_fn is not None: # batch = self._collate_fn(batch) - print("indices", indices) - sys.stdout.flush() batch = fetcher.fetch(indices) - print("batch", batch) - sys.stdout.flush() except Exception as e: if isinstance( e, @@ -521,6 +510,7 @@ def _get_data(self): if self._dataset_kind == _DatasetKind.ITER and isinstance( data, _IterableDatasetStopIteration): self._shutdown_worker(data.worker_id) + self._batches_outstanding -= 1 self._try_put_indices() continue @@ -532,6 +522,7 @@ def _get_data(self): continue def _try_put_indices(self): + # assert self._batches_outstanding <= self._outstanding_capacity, \ assert self._send_idx - self._rcvd_idx <= self._outstanding_capacity, \ "too many indices have been put to queue" try: @@ -539,7 +530,13 @@ def _try_put_indices(self): except StopIteration: return - worker_idx = next(self._workers_idx_cycle) + for i in range(self._num_workers): + worker_idx = next(self._workers_idx_cycle) + if self._worker_status[worker_idx]: + break + else: + return + self._indices_queues[worker_idx].put((self._send_idx, indices)) self._batches_outstanding += 1 self._send_idx += 1 diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 27712db412e30..d9546238c2da4 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -428,5 +428,6 @@ if(NOT WIN32 AND NOT APPLE) set_tests_properties(test_multiprocess_dataloader_static PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_dynamic PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") - set_tests_properties(test_multiprocess_dataloader_iterable_dataset PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") + set_tests_properties(test_multiprocess_dataloader_iterable_dataset_static PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") + set_tests_properties(test_multiprocess_dataloader_iterable_dataset_dynamic PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") endif() diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_dynamic.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_dynamic.py index de71afed6654e..8f0209406fdff 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_dynamic.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_dynamic.py @@ -27,8 +27,8 @@ from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.base import to_variable -from test_multiprocess_dataloader_static import RandomDataset, prepare_places -from test_multiprocess_dataloader_static import EPOCH_NUM, BATCH_SIZE, IMAGE_SIZE, SAMPLE_NUM, CLASS_NUM +from test_multiprocess_dataloader_iterable_dataset_static import RandomDataset, prepare_places +from test_multiprocess_dataloader_iterable_dataset_static import EPOCH_NUM, BATCH_SIZE, IMAGE_SIZE, SAMPLE_NUM, CLASS_NUM class SimpleFCNet(fluid.dygraph.Layer): @@ -116,10 +116,8 @@ def test_main(self): sys.stdout.flush() ret = self.run_main(num_workers=num_workers, places=p) results.append(ret) - diff = np.max( - np.abs(results[0]['loss'] - results[1]['loss']) / - np.abs(results[0]['loss'])) - self.assertLess(diff, 1e-2) + assert results[0]['loss'].shape[0] * 2 == results[1]['loss'].shape[ + 0] if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py index 3cf7f804ca61f..20d4131334543 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py @@ -127,7 +127,6 @@ def run_main(self, num_workers, places): loss_list = [] start_t = time.time() for i in six.moves.range(EPOCH_NUM): - print("epoch ", i, "enterxxxxxxxxxx") step = 0 for d in dataloader: assert len(d) == len(places), "{} != {}".format( @@ -164,10 +163,8 @@ def test_main(self): sys.stdout.flush() ret = self.run_main(num_workers=num_workers, places=p) results.append(ret) - diff = np.max( - np.abs(results[0]['loss'] - results[1]['loss']) / - np.abs(results[0]['loss'])) - self.assertLess(diff, 1e-2) + assert results[0]['loss'].shape[0] * 2 == results[1]['loss'].shape[ + 0] if __name__ == '__main__': From a1d5456f531ae6c9c548e625110d3087028eb212 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Tue, 28 Jul 2020 15:00:02 +0000 Subject: [PATCH 04/17] unittest success. test=develop --- python/paddle/fluid/dataloader/__init__.py | 6 +- .../fluid/dataloader/dataloader_iter.py | 65 ++++++++-- python/paddle/fluid/dataloader/dataset.py | 3 + ...ocess_dataloader_iterable_dataset_split.py | 113 ++++++++++++++++++ ...cess_dataloader_iterable_dataset_static.py | 2 +- python/paddle/io/__init__.py | 3 +- 6 files changed, 179 insertions(+), 13 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_split.py diff --git a/python/paddle/fluid/dataloader/__init__.py b/python/paddle/fluid/dataloader/__init__.py index 62aefd6aec8cb..2f15811e4f360 100644 --- a/python/paddle/fluid/dataloader/__init__.py +++ b/python/paddle/fluid/dataloader/__init__.py @@ -20,5 +20,9 @@ from . import batch_sampler from .batch_sampler import * +from . import dataloader_iter +from .dataloader_iter import * + __all__ = dataset.__all__ \ - + batch_sampler.__all__ + + batch_sampler.__all__ \ + + dataloader_iter.__all__ diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 50ff00d7b5be7..19cba27c91609 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -35,6 +35,8 @@ from ..multiprocess_utils import CleanupFuncRegistrar, _cleanup_mmap, _set_SIGCHLD_handler from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher +__all__ = ['get_worker_info'] + # multi-process worker check indices queue interval, avoid # hanging in subprocess data loading MP_INDICES_CHECK_INTERVAL = 5 @@ -85,6 +87,28 @@ def is_alive(self): return self._parent_alive +_worker_info = None + + +def get_worker_info(): + return _worker_info + + +class WorkerInfo(object): + __initialized = False + + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + self.__initialized = True + + def __setattr__(self, key, val): + if self.__initialized: + raise RuntimeError("Cannot assign attributes to {} objects".format( + self.__class__.__name__)) + return super(WorkerInfo, self).__setattr__(key, val) + + class _DataLoaderIterBase(object): """ Iterator implement of DataLoader, will load and feed mini-batch @@ -237,11 +261,11 @@ def __init__(self, loader): # data get from _data_queue will be reordered by _rcvd_idx # for data order keeping, data index not equal _rcvd_idx - # will be cached in _reorder_dict + # will be cached in _task_infos self._send_idx = 0 self._rcvd_idx = 0 self._batches_outstanding = 0 - self._reorder_dict = {} + self._task_infos = {} # indices outstand as _outstanding_capacity at first, and # blocking_queue capacity is also _outstanding_capacity. @@ -282,7 +306,8 @@ def _init_workers(self): target=self._worker_loop, args=(self._dataset, self._dataset_kind, indices_queue, self._data_queue, self._workers_done_event, - self._collate_fn, self._worker_init_fn, i)) + self._collate_fn, self._worker_init_fn, i, + self._num_workers)) worker.daemon = True worker.start() self._workers.append(worker) @@ -357,7 +382,7 @@ def _exit_thread_unexpectedly(self): logging.error("DataLoader reader thread raised an exception!") def _worker_loop(self, dataset, dataset_kind, indices_queue, out_queue, - done_event, collate_fn, init_fn, worker_id): + done_event, collate_fn, init_fn, worker_id, num_workers): try: # NOTE: [ mmap files clear ] When the child process exits unexpectedly, # some shared memory objects may have been applied for but have not yet @@ -368,6 +393,10 @@ def _worker_loop(self, dataset, dataset_kind, indices_queue, out_queue, # set signal handler core._set_process_signal_handler() + global _worker_info + _worker_info = WorkerInfo( + id=worker_id, num_workers=num_workers, dataset=dataset) + init_exception = None try: if init_fn is not None: @@ -466,10 +495,25 @@ def _thread_loop(self): self._rcvd_idx += 1 def _get_data(self): - if self._rcvd_idx in self._reorder_dict.keys(): - return self._reorder_dict.pop(self._rcvd_idx) - while not self._thread_done_event.is_set(): + while self._rcvd_idx < self._send_idx: + info = self._task_infos[self._rcvd_idx] + if len(info) == 2 or self._worker_status[info[0]]: + break + del self._task_infos[self._rcvd_idx] + self._rcvd_idx += 1 + self._batches_outstanding -= 1 + else: + # continue + # # self._shutdown_workers() + # # raise StopIteration + if self._batches_outstanding < len(self._places): + return None + continue + + if len(self._task_infos[self._rcvd_idx]) == 2: + return self._task_infos.pop(self._rcvd_idx)[1] + try: # [ avoid hang ]: main process may blocking at _reader.read_next when # KeyboardInterrupt, we do following tradeoff: @@ -516,14 +560,14 @@ def _get_data(self): idx, batch = data if idx == self._rcvd_idx: + del self._task_infos[idx] return batch else: - self._reorder_dict[idx] = batch + self._task_infos[idx] += (batch, ) continue def _try_put_indices(self): - # assert self._batches_outstanding <= self._outstanding_capacity, \ - assert self._send_idx - self._rcvd_idx <= self._outstanding_capacity, \ + assert self._batches_outstanding <= self._outstanding_capacity, \ "too many indices have been put to queue" try: indices = next(self._sampler_iter) @@ -538,6 +582,7 @@ def _try_put_indices(self): return self._indices_queues[worker_idx].put((self._send_idx, indices)) + self._task_infos[self._send_idx] = (worker_idx, ) self._batches_outstanding += 1 self._send_idx += 1 diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index 6ea5f6c8ed85a..f4e0e3eaeb530 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -84,6 +84,9 @@ class IterableDataset(Dataset): :code:`__iter__`: yield sample sequentially. This method is required by reading dataset sample in :code:`paddle.io.DataLoader`. + NOTE: do not need to implement :code:`__getitem__` and :code:`__len__` in IterableDataset, + should not be called either. + see :code:`paddle.io.DataLoader`. Examples: diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_split.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_split.py new file mode 100644 index 0000000000000..3288d9c10231b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_split.py @@ -0,0 +1,113 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import math +import unittest +import multiprocessing +import numpy as np + +import paddle.fluid as fluid +from paddle.io import IterableDataset, BatchSampler, DataLoader, get_worker_info + + +class RangeIterableDatasetSplit(IterableDataset): + def __init__(self, start, end): + self.start = start + self.end = end + + def __iter__(self): + worker_info = get_worker_info() + if worker_info is None: + iter_start = self.start + iter_end = self.end + else: + per_worker = int( + math.ceil((self.end - self.start) / float( + worker_info.num_workers))) + worker_id = worker_info.id + iter_start = self.start + worker_id * per_worker + iter_end = min(iter_start + per_worker, self.end) + + for i in range(iter_start, iter_end): + yield np.array([i]) + + +class TestDynamicDataLoaderIterSplit(unittest.TestCase): + def test_main(self): + place = fluid.CPUPlace() + with fluid.dygraph.guard(place): + dataset = RangeIterableDatasetSplit(0, 10) + dataloader = DataLoader( + dataset, + places=place, + num_workers=2, + batch_size=1, + drop_last=True) + + rets = [] + for d in dataloader: + rets.append(d[0].numpy()[0][0]) + + assert tuple(sorted(rets)) == tuple(range(0, 10)) + + +class RangeIterableDataset(IterableDataset): + def __init__(self, start, end): + self.start = start + self.end = end + + def __iter__(self): + # return iter(range(self.start, self.end)) + for i in range(self.start, self.end): + yield np.array([i]) + + +class TestDynamicDataLoaderIterInitFuncSplit(unittest.TestCase): + def test_main(self): + place = fluid.CPUPlace() + with fluid.dygraph.guard(place): + dataset = RangeIterableDataset(0, 10) + + def worker_spliter(worker_id): + worker_info = get_worker_info() + + dataset = worker_info.dataset + start = dataset.start + end = dataset.end + num_per_worker = int( + math.ceil((end - start) / float(worker_info.num_workers))) + + worker_id = worker_info.id + dataset.start = start + worker_id * num_per_worker + dataset.end = min(dataset.start + num_per_worker, end) + + dataloader = DataLoader( + dataset, + places=place, + num_workers=1, + batch_size=1, + drop_last=True, + worker_init_fn=worker_spliter) + + rets = [] + for d in dataloader: + rets.append(d[0].numpy()[0][0]) + + assert tuple(sorted(rets)) == tuple(range(0, 10)) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py index 20d4131334543..3da7e4cbf7716 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py @@ -23,7 +23,7 @@ import numpy as np import paddle.fluid as fluid -from paddle.io import IterableDataset, BatchSampler, DataLoader +from paddle.io import IterableDataset, BatchSampler, DataLoader, get_worker_info EPOCH_NUM = 3 BATCH_SIZE = 8 diff --git a/python/paddle/io/__init__.py b/python/paddle/io/__init__.py index 28b6157870a32..875f3ff2e9155 100644 --- a/python/paddle/io/__init__.py +++ b/python/paddle/io/__init__.py @@ -19,6 +19,7 @@ 'BatchSampler', # 'Transform', 'DataLoader', + 'get_worker_info', 'load', 'save', 'load_program_state', @@ -37,7 +38,7 @@ ] from ..fluid.io import DataLoader -from ..fluid.dataloader import Dataset, IterableDataset, BatchSampler +from ..fluid.dataloader import Dataset, IterableDataset, BatchSampler, get_worker_info from ..fluid.io import load, save, load_program_state, set_program_state, \ load_inference_model, save_inference_model, batch from ..reader import shuffle, buffered, cache, chain, firstn, compose, map_readers, xmap_readers From 6671b7d99dc92f08f6806d729432a0fc2b8e31fd Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Tue, 28 Jul 2020 15:30:08 +0000 Subject: [PATCH 05/17] add doc --- python/paddle/fluid/dataloader/dataset.py | 96 +++++++++++++++++++ python/paddle/fluid/reader.py | 3 +- ...ocess_dataloader_iterable_dataset_split.py | 2 - 3 files changed, 98 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index f4e0e3eaeb530..f58e85ae24f24 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -111,6 +111,102 @@ def __iter__(self, idx): for img, lbl in iter(dataset): print(img, lbl) + When :attr:`num_workers > 0`, each worker has a different copy of the dataset object and + will yield whole dataset samples, which means samples in dataset will be repeat in + :attr:`num_workers` times. If it is require that each sample to be yield only once, there + are two methods to configure different copy in each worker process to avoid duplicate data + among workers as follows. In both the two methods, worker information that can be get in + a worker process by `paddle.io.get_worker_info` will be needed. + + Example 1: splitting data copy in each worker in :code:`__iter__` + + .. code-block:: python + + import math + import numpy as np + import paddle.fluid as fluid + from paddle.io import IterableDataset, DataLoader, get_worker_info + + class SplitedIterableDataset(IterableDataset): + def __init__(self, start, end): + self.start = start + self.end = end + + def __iter__(self): + worker_info = get_worker_info() + if worker_info is None: + iter_start = self.start + iter_end = self.end + else: + per_worker = int( + math.ceil((self.end - self.start) / float( + worker_info.num_workers))) + worker_id = worker_info.id + iter_start = self.start + worker_id * per_worker + iter_end = min(iter_start + per_worker, self.end) + + for i in range(iter_start, iter_end): + yield np.array([i]) + + place = fluid.CPUPlace() + with fluid.dygraph.guard(place): + dataset = SplitedIterableDataset(start=2, end=9) + dataloader = DataLoader( + dataset, + places=place, + num_workers=2, + batch_size=1, + drop_last=True) + + print(list(dataloader)) + # outputs: [2, 5, 3, 6, 4, 7] + + Example 2: splitting data copy in each worker by :code:`worker_init_fn` + + .. code-block:: python + + import math + import numpy as np + import paddle.fluid as fluid + from paddle.io import IterableDataset, DataLoader + + class RangeIterableDataset(IterableDataset): + def __init__(self, start, end): + self.start = start + self.end = end + + def __iter__(self): + for i in range(self.start, self.end): + yield np.array([i]) + + place = fluid.CPUPlace() + with fluid.dygraph.guard(place): + dataset = RangeIterableDataset(start=2, end=9) + + def worker_init_fn(worker_id): + worker_info = get_worker_info() + + dataset = worker_info.dataset + start = dataset.start + end = dataset.end + num_per_worker = int( + math.ceil((end - start) / float(worker_info.num_workers))) + + worker_id = worker_info.id + dataset.start = start + worker_id * num_per_worker + dataset.end = min(dataset.start + num_per_worker, end) + + dataloader = DataLoader( + dataset, + places=place, + num_workers=2, + batch_size=1, + drop_last=True, + worker_init_fn=worker_init_fn) + + print(list(dataloader)) + # outputs: [2, 5, 3, 6, 4, 7] + """ def __init__(self): diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index eed96200b5d2a..8f464c6bb6852 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -128,7 +128,8 @@ class DataLoader(object): Args: dataset(Dataset): the dataset to load data from, should be an - instance of subclass of :code:`paddle.io.Dataset`. + instance of subclass of :code:`paddle.io.Dataset` or + :code:`paddle.io.IterableDataset`. feed_list (list(Variable)|tuple(Variable)): feed variable list. The variables should be created by :code:`fluid.data()`. :attr:`feed_list` must be set if :attr:`return_list` is diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_split.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_split.py index 3288d9c10231b..562051335850a 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_split.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_split.py @@ -16,7 +16,6 @@ import math import unittest -import multiprocessing import numpy as np import paddle.fluid as fluid @@ -70,7 +69,6 @@ def __init__(self, start, end): self.end = end def __iter__(self): - # return iter(range(self.start, self.end)) for i in range(self.start, self.end): yield np.array([i]) From f1a18a5a75779de3f17f811a5495b717a43425c5 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Tue, 28 Jul 2020 15:47:32 +0000 Subject: [PATCH 06/17] polish comment --- .../fluid/dataloader/dataloader_iter.py | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 19cba27c91609..f4f90ffb3f262 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -87,6 +87,8 @@ def is_alive(self): return self._parent_alive +# worker information for each workers, used for splitting data copy +# for IteratorDataset in worker processes. _worker_info = None @@ -190,10 +192,7 @@ def _init_thread(self): def _thread_loop(self): try: for indices in self._sampler_iter: - # # read data from dataset in mini-batch - # batch = [self._dataset[i] for i in indices] - # if self._collate_fn is not None: - # batch = self._collate_fn(batch) + # read data from dataset in mini-batch batch = self._dataset_fetcher.fetch(indices) # pack as LoDTensorArray @@ -432,9 +431,6 @@ def _worker_loop(self, dataset, dataset_kind, indices_queue, out_queue, batch = init_exception init_exception = None else: - # batch = [dataset[i] for i in indices] - # if self._collate_fn is not None: - # batch = self._collate_fn(batch) batch = fetcher.fetch(indices) except Exception as e: if isinstance( @@ -480,7 +476,6 @@ def _thread_loop(self): # serializable, cannot be create in workers for slot in batch: if not isinstance(slot, core.LoDTensor): - # self._check_input_array(slot) tmp = core.LoDTensor() tmp.set(slot, core.CPUPlace()) slot = tmp @@ -496,6 +491,12 @@ def _thread_loop(self): def _get_data(self): while not self._thread_done_event.is_set(): + # For IterableDataset, batch indices is generate infinitely + # for each worker to raise StopIteration, but a StopIteration + # raising process will discard a batch indices which is count + # in _send_idx but will not increase _rcvd_idx, so we check + # whether the worker is still alive here to skip the discarded + # batch indices and increase _rcvd_idx while self._rcvd_idx < self._send_idx: info = self._task_infos[self._rcvd_idx] if len(info) == 2 or self._worker_status[info[0]]: @@ -504,9 +505,9 @@ def _get_data(self): self._rcvd_idx += 1 self._batches_outstanding -= 1 else: - # continue - # # self._shutdown_workers() - # # raise StopIteration + # NOTE: _rcvd_idx and _send_idx only record batches among + # workers, if batches among workers drained, there + # may also be data in blocking queue if self._batches_outstanding < len(self._places): return None continue @@ -553,6 +554,11 @@ def _get_data(self): else: if self._dataset_kind == _DatasetKind.ITER and isinstance( data, _IterableDatasetStopIteration): + # if a worker get StopIteraion, we shutdown this worker, + # note that this batch indices to trigger StopIteration + # is discard, outstanding batch number should be decrease + # and another indices should be put for other workers + # may still working. self._shutdown_worker(data.worker_id) self._batches_outstanding -= 1 self._try_put_indices() From 85ac7434483b94482366ff2d8ff0d35de7c64299 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Thu, 30 Jul 2020 03:31:49 +0000 Subject: [PATCH 07/17] add get_worker_info doc --- python/paddle/fluid/dataloader/dataloader_iter.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 032d6e5af1b96..c2f864f159239 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -113,6 +113,19 @@ def is_alive(self): def get_worker_info(): + """ + Get DataLoader worker process information function, this function is + used to splitd data copy in worker process for IterableDataset + (see :code:`paddle.io.IterableDataset`), worker informations contains + following fields: + + :attr:`num_workers`: total worker process number, see `paddle.io.DataLoader` + :attr:`id`: the worker processs id, count from 0 to :attr:`num_workers - 1` + :attr:`dataset`: the dataset object in this worker process + + Returns: + WorkerInfo: an instance of WorkerInfo which contains fields above. + """ return _worker_info From 78851a3faaa1ba1999e75882b97e72f6086e6951 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Thu, 30 Jul 2020 12:42:51 +0000 Subject: [PATCH 08/17] fix doc. test=develop --- python/paddle/fluid/dataloader/dataloader_iter.py | 2 ++ python/paddle/fluid/dataloader/dataset.py | 6 ++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index c2f864f159239..55143339f8ea7 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -120,7 +120,9 @@ def get_worker_info(): following fields: :attr:`num_workers`: total worker process number, see `paddle.io.DataLoader` + :attr:`id`: the worker processs id, count from 0 to :attr:`num_workers - 1` + :attr:`dataset`: the dataset object in this worker process Returns: diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index f58e85ae24f24..dce0ad343619d 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -81,11 +81,9 @@ class IterableDataset(Dataset): a python iterater) should be a subclass of `paddle.io.IterableDataset`. All subclasses should implement following methods: - :code:`__iter__`: yield sample sequentially. This method is required by - reading dataset sample in :code:`paddle.io.DataLoader`. + :code:`__iter__`: yield sample sequentially. This method is required by reading dataset sample in :code:`paddle.io.DataLoader`. - NOTE: do not need to implement :code:`__getitem__` and :code:`__len__` in IterableDataset, - should not be called either. + NOTE: do not implement :code:`__getitem__` and :code:`__len__` in IterableDataset, should not be called either. see :code:`paddle.io.DataLoader`. From a92c221171c3d92ab97614cec57cc0976e9a9d4e Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Fri, 31 Jul 2020 08:00:37 +0000 Subject: [PATCH 09/17] fix test_batch_sampler --- python/paddle/fluid/dataloader/batch_sampler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/paddle/fluid/dataloader/batch_sampler.py b/python/paddle/fluid/dataloader/batch_sampler.py index 5347122a387b0..b4d24009b7650 100644 --- a/python/paddle/fluid/dataloader/batch_sampler.py +++ b/python/paddle/fluid/dataloader/batch_sampler.py @@ -106,6 +106,7 @@ def __init__(self, assert isinstance(indices, list) or isinstance(indices, tuple), \ "indices should be a list or tuple, but got {}".format(type(indices)) self.indices = indices + self.sampler_iter = None else: if isinstance(dataset, IterableDataset): self.sampler_iter = iter( From 789f9b44a7e94ea089bee37105a5eca27f972b4c Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Mon, 3 Aug 2020 13:13:34 +0000 Subject: [PATCH 10/17] fix unittest after merging develop. test=develop --- .../test_multiprocess_dataloader_exception.py | 13 ++++++++----- ...ltiprocess_dataloader_iterable_dataset_static.py | 10 ++++++++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py index f3b15835b9e6f..8decb62d72c1f 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py @@ -144,12 +144,15 @@ def _collate_fn(sample_list): indices_queue.put([i, i + 10]) indices_queue.put(None) loader._worker_loop( - loader._dataset, indices_queue, loader._data_queue, - loader._workers_done_event, _collate_fn, _init_fn, 0) + loader._dataset, 0, indices_queue, loader._data_queue, + loader._workers_done_event, _collate_fn, _init_fn, 0, 1) self.assertTrue(False) except AssertionError: pass - except Exception: + except Exception as e: + print("Exception", e) + import sys + sys.stdout.flush() self.assertTrue(False) def run_with_worker_done(self, use_shared_memory=True): @@ -184,8 +187,8 @@ def _collate_fn(sample_list): indices_queue.put(None) loader._workers_done_event.set() loader._worker_loop( - loader._dataset, indices_queue, loader._data_queue, - loader._workers_done_event, _collate_fn, _init_fn, 0) + loader._dataset, 0, indices_queue, loader._data_queue, + loader._workers_done_event, _collate_fn, _init_fn, 0, 1) self.assertTrue(True) except AssertionError: pass diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py index 3da7e4cbf7716..2d011ae18ff3c 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py @@ -136,8 +136,14 @@ def run_main(self, num_workers, places): label = item['label'] assert image.shape() == [BATCH_SIZE, IMAGE_SIZE] assert label.shape() == [BATCH_SIZE, 1] - assert image._place()._equals(places[i]) - assert label._place()._equals(places[i]) + if places[i]._equals(fluid.CPUPlace()): + assert image._place()._equals(fluid.CPUPlace()) + assert label._place()._equals(fluid.CPUPlace()) + else: + assert image._place()._equals(fluid.CUDAPinnedPlace( + )) + assert label._place()._equals(fluid.CUDAPinnedPlace( + )) L, = exe.run(program=prog, feed=d, fetch_list=[loss], From 11fa586f907bc6e9fcc892fd0b53f3b7dc4ea019 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Mon, 3 Aug 2020 16:10:01 +0000 Subject: [PATCH 11/17] fix sample code. test=develop --- python/paddle/fluid/dataloader/dataset.py | 8 ++++---- python/paddle/fluid/reader.py | 4 ++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index dce0ad343619d..09b1edf1248b6 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -99,14 +99,14 @@ class RandomDataset(Dataset): def __init__(self, num_samples): self.num_samples = num_samples - def __iter__(self, idx): - for i in range(self.num_samples) + def __iter__(self): + for i in range(self.num_samples): image = np.random.random([784]).astype('float32') label = np.random.randint(0, 9, (1, )).astype('int64') yield image, label dataset = RandomDataset(10) - for img, lbl in iter(dataset): + for img, lbl in dataset: print(img, lbl) When :attr:`num_workers > 0`, each worker has a different copy of the dataset object and @@ -166,7 +166,7 @@ def __iter__(self): import math import numpy as np import paddle.fluid as fluid - from paddle.io import IterableDataset, DataLoader + from paddle.io import IterableDataset, DataLoader, get_worker_info class RangeIterableDataset(IterableDataset): def __init__(self, start, end): diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 8453d661504f2..8f2b4a1b831be 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -288,6 +288,10 @@ def forward(self, image, label=None): # ------------------------------------------------------- + Note: + For reading iterable dataset with multiprocess Dataloader, + please see :code:`paddle.io.IterableDataset` + """ def __init__(self, From 2f5ee04b2d0ac815b758509b4eadb1dd5ddc0155 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Thu, 6 Aug 2020 12:27:44 +0000 Subject: [PATCH 12/17] fix according reviews. test=develop --- python/paddle/fluid/dataloader/dataloader_iter.py | 4 ++-- python/paddle/fluid/dataloader/dataset.py | 13 +++++++------ python/paddle/fluid/reader.py | 4 ++-- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 55143339f8ea7..6be24c1a2c377 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -115,7 +115,7 @@ def is_alive(self): def get_worker_info(): """ Get DataLoader worker process information function, this function is - used to splitd data copy in worker process for IterableDataset + used to split data copy in worker process for IterableDataset (see :code:`paddle.io.IterableDataset`), worker informations contains following fields: @@ -526,7 +526,7 @@ def _thread_loop(self): def _get_data(self): while not self._thread_done_event.is_set(): - # For IterableDataset, batch indices is generate infinitely + # For IterableDataset, batch indices is generated infinitely # for each worker to raise StopIteration, but a StopIteration # raising process will discard a batch indices which is count # in _send_idx but will not increase _rcvd_idx, so we check diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index 09b1edf1248b6..5c5dc5730c597 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -21,7 +21,7 @@ class Dataset(object): """ - An abstract class to encapsulates methods and behaviors of datasets. + An abstract class to encapsulate methods and behaviors of datasets. All datasets in map-style(dataset samples can be get by a given key) should be a subclass of `paddle.io.Dataset`. All subclasses should @@ -75,15 +75,16 @@ def __len__(self): class IterableDataset(Dataset): """ - An abstract class to encapsulates methods and behaviors of iterable datasets. + An abstract class to encapsulate methods and behaviors of iterable datasets. - All datasets in iterable-style(can only get sample one by one sequentially, like - a python iterater) should be a subclass of `paddle.io.IterableDataset`. All subclasses should + All datasets in iterable-style (can only get sample one by one sequentially, like + a Python iterator) should be a subclass of `paddle.io.IterableDataset`. All subclasses should implement following methods: :code:`__iter__`: yield sample sequentially. This method is required by reading dataset sample in :code:`paddle.io.DataLoader`. - NOTE: do not implement :code:`__getitem__` and :code:`__len__` in IterableDataset, should not be called either. + .. note:: + do not implement :code:`__getitem__` and :code:`__len__` in IterableDataset, should not be called either. see :code:`paddle.io.DataLoader`. @@ -110,7 +111,7 @@ def __iter__(self): print(img, lbl) When :attr:`num_workers > 0`, each worker has a different copy of the dataset object and - will yield whole dataset samples, which means samples in dataset will be repeat in + will yield whole dataset samples, which means samples in dataset will be repeated in :attr:`num_workers` times. If it is require that each sample to be yield only once, there are two methods to configure different copy in each worker process to avoid duplicate data among workers as follows. In both the two methods, worker information that can be get in diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 8f2b4a1b831be..cb4865491e670 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -288,7 +288,7 @@ def forward(self, image, label=None): # ------------------------------------------------------- - Note: + .. note:: For reading iterable dataset with multiprocess Dataloader, please see :code:`paddle.io.IterableDataset` @@ -353,7 +353,7 @@ def __init__(self, format(shuffle)) if batch_sampler is not None: raise ValueError( - "IterableDataset expect unspecified batch_sample") + "IterableDataset expect unspecified batch_sampler") else: self.dataset_kind = _DatasetKind.MAP From 447573b23fb39b72d0881282afc35fe9f84e950a Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Fri, 7 Aug 2020 10:52:15 +0000 Subject: [PATCH 13/17] add sample code for get_worker_info. test=develop --- .../fluid/dataloader/dataloader_iter.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 6be24c1a2c377..9fbb497062eab 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -127,6 +127,53 @@ def get_worker_info(): Returns: WorkerInfo: an instance of WorkerInfo which contains fields above. + + .. note:: + For mode usage and exampls, please see :code:`paddle.io.IterableDataset` + + Example: + + .. code-block:: python + + import math + import numpy as np + import paddle.fluid as fluid + from paddle.io import IterableDataset, DataLoader, get_worker_info + + class SplitedIterableDataset(IterableDataset): + def __init__(self, start, end): + self.start = start + self.end = end + + def __iter__(self): + worker_info = get_worker_info() + if worker_info is None: + iter_start = self.start + iter_end = self.end + else: + per_worker = int( + math.ceil((self.end - self.start) / float( + worker_info.num_workers))) + worker_id = worker_info.id + iter_start = self.start + worker_id * per_worker + iter_end = min(iter_start + per_worker, self.end) + + for i in range(iter_start, iter_end): + yield np.array([i]) + + place = fluid.CPUPlace() + with fluid.dygraph.guard(place): + dataset = SplitedIterableDataset(start=2, end=9) + dataloader = DataLoader( + dataset, + places=place, + num_workers=2, + batch_size=1, + drop_last=True) + + print(list(dataloader)) + # outputs: [2, 5, 3, 6, 4, 7] + """ return _worker_info From 200c5d82ad75d09a5ac0a87435a2d584e8b4d489 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Sat, 8 Aug 2020 14:09:14 +0000 Subject: [PATCH 14/17] add runtime error test. test=develop --- .../test_multiprocess_dataloader_exception.py | 44 ++++++++++++++++++- ...cess_dataloader_iterable_dataset_static.py | 4 +- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py index 8decb62d72c1f..3a8867f6bd29f 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py @@ -24,7 +24,7 @@ import paddle.fluid as fluid import paddle.fluid.core as core -from paddle.io import Dataset, BatchSampler, DataLoader +from paddle.io import Dataset, IterableDataset, BatchSampler, DataLoader from paddle.fluid.dygraph.nn import Linear from paddle.fluid.dygraph.base import to_variable @@ -108,6 +108,48 @@ def test_main(self): self.assertTrue(False) +class TestDatasetRuntimeError(unittest.TestCase): + def test_main(self): + dataset = Dataset() + + # __getitem__ not implement + try: + d = dataset[0] + self.assertTrue(False) + except NotImplementedError: + pass + + # __len__ not implement + try: + l = len(dataset) + self.assertTrue(False) + except NotImplementedError: + pass + + dataset = IterableDataset() + + # __iter__ not implement + try: + d = iter(dataset) + self.assertTrue(False) + except NotImplementedError: + pass + + # __getitem__ runtime error + try: + d = dataset[0] + self.assertTrue(False) + except RuntimeError: + pass + + # __len__ runtime error + try: + l = len(dataset) + self.assertTrue(False) + except RuntimeError: + pass + + # CI Converage cannot record stub in subprocess, # HACK a _worker_loop in main process call here @unittest.skipIf(not core.is_compiled_with_cuda(), diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py index 2d011ae18ff3c..87fc1b3e6042c 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py @@ -25,10 +25,10 @@ import paddle.fluid as fluid from paddle.io import IterableDataset, BatchSampler, DataLoader, get_worker_info -EPOCH_NUM = 3 +EPOCH_NUM = 2 BATCH_SIZE = 8 IMAGE_SIZE = 32 -SAMPLE_NUM = 100 +SAMPLE_NUM = 80 CLASS_NUM = 10 From 4120baaccee9bfdcab232b39454f10a2c6e6a025 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Mon, 10 Aug 2020 07:37:13 +0000 Subject: [PATCH 15/17] fix unittest after merge develop. test=develop --- ..._multiprocess_dataloader_iterable_dataset_static.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py index 87fc1b3e6042c..e64e11d156ec7 100644 --- a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_iterable_dataset_static.py @@ -136,14 +136,8 @@ def run_main(self, num_workers, places): label = item['label'] assert image.shape() == [BATCH_SIZE, IMAGE_SIZE] assert label.shape() == [BATCH_SIZE, 1] - if places[i]._equals(fluid.CPUPlace()): - assert image._place()._equals(fluid.CPUPlace()) - assert label._place()._equals(fluid.CPUPlace()) - else: - assert image._place()._equals(fluid.CUDAPinnedPlace( - )) - assert label._place()._equals(fluid.CUDAPinnedPlace( - )) + assert image._place()._equals(places[i]) + assert label._place()._equals(places[i]) L, = exe.run(program=prog, feed=d, fetch_list=[loss], From 6bd39599c75fff8592016a11adb02df56611e7ae Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Mon, 10 Aug 2020 13:43:45 +0000 Subject: [PATCH 16/17] fix doc. test=develop --- python/paddle/fluid/dataloader/batch_sampler.py | 2 +- python/paddle/fluid/dataloader/dataloader_iter.py | 2 +- python/paddle/fluid/dataloader/dataset.py | 2 +- python/paddle/fluid/reader.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/dataloader/batch_sampler.py b/python/paddle/fluid/dataloader/batch_sampler.py index b4d24009b7650..811468c523b2f 100644 --- a/python/paddle/fluid/dataloader/batch_sampler.py +++ b/python/paddle/fluid/dataloader/batch_sampler.py @@ -159,7 +159,7 @@ class _InfiniteIterableSampler(object): def __init__(self, dataset, batch_size=1): assert isinstance( dataset, IterableDataset - ), "dataset should be an instnace of paddle.io.IterableDataset" + ), "dataset should be an instance of paddle.io.IterableDataset" self.dataset = dataset self.batch_size = batch_size diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 8c05c69f2ed6c..5cb831eee3a4b 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -116,7 +116,7 @@ def get_worker_info(): """ Get DataLoader worker process information function, this function is used to split data copy in worker process for IterableDataset - (see :code:`paddle.io.IterableDataset`), worker informations contains + (see :code:`paddle.io.IterableDataset`), worker information contains following fields: :attr:`num_workers`: total worker process number, see `paddle.io.DataLoader` diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index 5c5dc5730c597..5ea2d4cdc80d0 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -112,7 +112,7 @@ def __iter__(self): When :attr:`num_workers > 0`, each worker has a different copy of the dataset object and will yield whole dataset samples, which means samples in dataset will be repeated in - :attr:`num_workers` times. If it is require that each sample to be yield only once, there + :attr:`num_workers` times. If it is required for each sample to yield only once, there are two methods to configure different copy in each worker process to avoid duplicate data among workers as follows. In both the two methods, worker information that can be get in a worker process by `paddle.io.get_worker_info` will be needed. diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index f34f7a3a3bc0f..cd699d3325482 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -139,7 +139,7 @@ class DataLoader(object): dataset(Dataset): the dataset to load data from, should be an instance of subclass of :code:`paddle.io.Dataset` or :code:`paddle.io.IterableDataset`. - feed_list (list(Variable)|tuple(Variable)): feed variable list. + feed_list (list(Tensor)|tuple(Tensor)): feed variable list. The variables should be created by :code:`fluid.data()`. :attr:`feed_list` must be set if :attr:`return_list` is False. Default None. From 751fca0d8643449f2ef1ba62b2c395f50464f926 Mon Sep 17 00:00:00 2001 From: dengkaipeng Date: Mon, 10 Aug 2020 13:46:02 +0000 Subject: [PATCH 17/17] fix doc. test=develop --- python/paddle/fluid/dataloader/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index 5ea2d4cdc80d0..e47f57381c0de 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -114,7 +114,7 @@ def __iter__(self): will yield whole dataset samples, which means samples in dataset will be repeated in :attr:`num_workers` times. If it is required for each sample to yield only once, there are two methods to configure different copy in each worker process to avoid duplicate data - among workers as follows. In both the two methods, worker information that can be get in + among workers as follows. In both the methods, worker information that can be getted in a worker process by `paddle.io.get_worker_info` will be needed. Example 1: splitting data copy in each worker in :code:`__iter__`