From a79fbc0957ed2a499a8f8822b96f7c2fa471ff61 Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Mon, 7 Sep 2020 17:33:54 +0800 Subject: [PATCH 01/14] refine fleet dataset class api --- .../distributed/fleet/dataset/dataset.py | 68 ++++++++++++++++--- 1 file changed, 60 insertions(+), 8 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index f6504cacd9680..9eda6e69cf017 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -65,7 +65,16 @@ def create_dataset(self, datafeed_class="QueueDataset"): class DatasetBase(object): """ Base dataset class. """ - def __init__(self): + def __init__(self, + batch_size=batch_size, + thread_num=thread_num, + use_var=use_var, + pipe_command=pipe_command, + input_type=input_type, + fs_name=fs_name, + fs_ugi=fs_ugi, + download_cmd=download_cmd, + eval_candidate_size=eval_candidate_size): """ Init. """ # define class name here # to decide whether we need create in memory instance @@ -74,6 +83,15 @@ def __init__(self): self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 self.filelist = [] + self.set_batch_size(batch_size) + self.set_thread(thread_num) + self.set_use_var(use_var) + self.set_pipe_command(pipe_command) + self.set_input_type(input_type) + self.set_hdfs_config(fs_name, fs_ugi) + if eval_candidate_size > 0: + self.set_fea_eval(eval_candidate_size, True) + self.set_download_cmd(download_cmd) def set_pipe_command(self, pipe_command): """ @@ -297,13 +315,13 @@ def _prepare_to_run(self): if self.thread_num > len(self.filelist): self.thread_num = len(self.filelist) self.dataset.set_thread_num(self.thread_num) - self.dataset.set_data_feed_desc(self.desc()) + self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_readers() def _finish_to_run(self): self.dataset.destroy_readers() - def desc(self): + def _desc(self): """ Returns a protobuf message for this DataFeedDesc @@ -312,7 +330,7 @@ def desc(self): import paddle.fluid as fluid dataset = fluid.DatasetFactory().create_dataset() - print(dataset.desc()) + print(dataset._desc()) Returns: A string message @@ -336,9 +354,35 @@ class InMemoryDataset(DatasetBase): dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset") """ - def __init__(self): + def __init__(self, + batch_size=1, + thread_num=1, + use_var=[], + pipe_command="cat", + input_type=0, + fs_name="", + fs_ugi="", + download_cmd="cat", + eval_candidate_size=-1, + data_feed_type="MultiSlotInMemoryDataFeed", + queue_num=-1, + parse_ins_id=False, + parse_content=False, + fleet_send_batch_size=1024, + fleet_send_sleep_seconds=0, + merge_by_lineid=False, + merge_size=2): """ Init. """ - super(InMemoryDataset, self).__init__() + super(InMemoryDataset, self).__init__( + batch_size=batch_size, + thread_num=thread_num, + use_var=use_var, + pipe_command=pipe_command, + input_type=input_type, + fs_name=fs_name, + fs_ugi=fs_ugi, + download_cmd=download_cmd, + eval_candidate_size=eval_candidate_size) self.proto_desc.name = "MultiSlotInMemoryDataFeed" self.fleet_send_batch_size = None self.is_user_set_queue_num = False @@ -350,6 +394,14 @@ def __init__(self): self.enable_pv_merge = False self.merge_by_lineid = False self.fleet_send_sleep_seconds = None + self.set_feed_type(data_feed_type) + self.set_queue_num(queue_num) + self.set_parse_ins_id(parse_ins_id) + self.set_parse_content(parse_content) + self.set_fleet_send_batch_size(fleet_send_batch_size) + self.set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) + if merge_by_lineid: + self.set_merge_by_lineid(merge_size) def set_feed_type(self, data_feed_type): """ @@ -373,7 +425,7 @@ def _prepare_to_run(self): self.dataset.set_parse_logkey(self.parse_logkey) self.dataset.set_merge_by_sid(self.merge_by_sid) self.dataset.set_enable_pv_merge(self.enable_pv_merge) - self.dataset.set_data_feed_desc(self.desc()) + self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_channel() self.dataset.create_readers() @@ -881,7 +933,7 @@ def _prepare_to_run(self): self.thread_num = 1 self.dataset.set_thread_num(self.thread_num) self.dataset.set_filelist(self.filelist) - self.dataset.set_data_feed_desc(self.desc()) + self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_readers() def local_shuffle(self): From 6ede4392cb5e6c70499082a5d8ea7e9af419fa5e Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Tue, 8 Sep 2020 15:45:52 +0800 Subject: [PATCH 02/14] update --- .../distributed/fleet/dataset/dataset.py | 196 ++++++++++-------- .../fluid/tests/unittests/test_dataset.py | 13 +- 2 files changed, 114 insertions(+), 95 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 9eda6e69cf017..dc6bcdd284b30 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -65,16 +65,7 @@ def create_dataset(self, datafeed_class="QueueDataset"): class DatasetBase(object): """ Base dataset class. """ - def __init__(self, - batch_size=batch_size, - thread_num=thread_num, - use_var=use_var, - pipe_command=pipe_command, - input_type=input_type, - fs_name=fs_name, - fs_ugi=fs_ugi, - download_cmd=download_cmd, - eval_candidate_size=eval_candidate_size): + def __init__(self): """ Init. """ # define class name here # to decide whether we need create in memory instance @@ -83,14 +74,22 @@ def __init__(self, self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 self.filelist = [] + + def init(self, + batch_size=1, + thread_num=1, + use_var=[], + pipe_command="cat", + input_type=0, + fs_name="", + fs_ugi="", + download_cmd="cat"): self.set_batch_size(batch_size) self.set_thread(thread_num) self.set_use_var(use_var) self.set_pipe_command(pipe_command) self.set_input_type(input_type) self.set_hdfs_config(fs_name, fs_ugi) - if eval_candidate_size > 0: - self.set_fea_eval(eval_candidate_size, True) self.set_download_cmd(download_cmd) def set_pipe_command(self, pipe_command): @@ -128,51 +127,6 @@ def set_rank_offset(self, rank_offset): """ self.proto_desc.rank_offset = rank_offset - def set_fea_eval(self, record_candidate_size, fea_eval=True): - """ - set fea eval mode for slots shuffle to debug the importance level of - slots(features), fea_eval need to be set True for slots shuffle. - - Args: - record_candidate_size(int): size of instances candidate to shuffle - one slot - fea_eval(bool): whether enable fea eval mode to enable slots shuffle. - default is True. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_fea_eval(1000000, True) - - """ - if fea_eval: - self.dataset.set_fea_eval(fea_eval, record_candidate_size) - self.fea_eval = fea_eval - - def slots_shuffle(self, slots): - """ - Slots Shuffle - Slots Shuffle is a shuffle method in slots level, which is usually used - in sparse feature with large scale of instances. To compare the metric, i.e. - auc while doing slots shuffle on one or several slots with baseline to - evaluate the importance level of slots(features). - - Args: - slots(list[string]): the set of slots(string) to do slots shuffle. - - Examples: - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_merge_by_lineid() - #suppose there is a slot 0 - dataset.slots_shuffle(['0']) - """ - if self.fea_eval: - slots_set = set(slots) - self.dataset.slots_shuffle(slots_set) - def set_batch_size(self, batch_size): """ Set batch size. Will be effective during training @@ -354,35 +308,9 @@ class InMemoryDataset(DatasetBase): dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset") """ - def __init__(self, - batch_size=1, - thread_num=1, - use_var=[], - pipe_command="cat", - input_type=0, - fs_name="", - fs_ugi="", - download_cmd="cat", - eval_candidate_size=-1, - data_feed_type="MultiSlotInMemoryDataFeed", - queue_num=-1, - parse_ins_id=False, - parse_content=False, - fleet_send_batch_size=1024, - fleet_send_sleep_seconds=0, - merge_by_lineid=False, - merge_size=2): + def __init__(self): """ Init. """ - super(InMemoryDataset, self).__init__( - batch_size=batch_size, - thread_num=thread_num, - use_var=use_var, - pipe_command=pipe_command, - input_type=input_type, - fs_name=fs_name, - fs_ugi=fs_ugi, - download_cmd=download_cmd, - eval_candidate_size=eval_candidate_size) + super(InMemoryDataset, self).__init__() self.proto_desc.name = "MultiSlotInMemoryDataFeed" self.fleet_send_batch_size = None self.is_user_set_queue_num = False @@ -394,13 +322,54 @@ def __init__(self, self.enable_pv_merge = False self.merge_by_lineid = False self.fleet_send_sleep_seconds = None + + def init(self, **kwargs): + """ + should be called only once in user's python scripts to initialize seetings of dataset instance + """ + batch_size = kwargs.get("batch_size", 1) + thread_num = kwargs.get("thread_num", 1) + use_var = kwargs.get("use_var", []) + input_type = kwargs.get("input_type", 0) + fs_name = kwargs.get("fs_name", "") + fs_ugi = kwargs.get("fs_ugi", "") + pipe_command = kwargs.get("pipe_command", "cat") + download_cmd = kwargs.get("download_cmd", "cat") + + super(InMemoryDataset, self).init( + batch_size=batch_size, + thread_num=thread_num, + use_var=use_var, + pipe_command=pipe_command, + input_type=input_type, + fs_name=fs_name, + fs_ugi=fs_ugi, + download_cmd=download_cmd) + + data_feed_type = kwargs.get("data_feed_type", + "MultiSlotInMemoryDataFeed") self.set_feed_type(data_feed_type) - self.set_queue_num(queue_num) - self.set_parse_ins_id(parse_ins_id) - self.set_parse_content(parse_content) - self.set_fleet_send_batch_size(fleet_send_batch_size) - self.set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) - if merge_by_lineid: + + if kwargs.get("queue_num", -1) > 0: + queue_num = kwargs.get("queue_num", -1) + self.set_queue_num(queue_num) + parse_ins_id = kwargs.get("parse_ins_id", False) + if parse_ins_id: + self.set_parse_ins_id(parse_ins_id) + + parse_content = kwargs.get("parse_content", False) + if parse_content: + self.set_parse_content(parse_content) + + fleet_send_batch_size = kwargs.get("fleet_send_batch_size", None) + if fleet_send_batch_size: + self.set_fleet_send_batch_size(fleet_send_batch_size) + + fleet_send_sleep_seconds = kwargs.get("fleet_send_sleep_seconds", None) + if fleet_send_batch_size: + self.set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) + merge_size = kwargs.get("merge_size", -1) + if merge_size > 0: self.set_merge_by_lineid(merge_size) def set_feed_type(self, data_feed_type): @@ -901,6 +870,51 @@ def get_shuffle_data_size(self, fleet=None): return global_data_size[0] return local_data_size[0] + def set_fea_eval(self, record_candidate_size, fea_eval=True): + """ + set fea eval mode for slots shuffle to debug the importance level of + slots(features), fea_eval need to be set True for slots shuffle. + + Args: + record_candidate_size(int): size of instances candidate to shuffle + one slot + fea_eval(bool): whether enable fea eval mode to enable slots shuffle. + default is True. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_fea_eval(1000000, True) + + """ + if fea_eval: + self.dataset.set_fea_eval(fea_eval, record_candidate_size) + self.fea_eval = fea_eval + + def slots_shuffle(self, slots): + """ + Slots Shuffle + Slots Shuffle is a shuffle method in slots level, which is usually used + in sparse feature with large scale of instances. To compare the metric, i.e. + auc while doing slots shuffle on one or several slots with baseline to + evaluate the importance level of slots(features). + + Args: + slots(list[string]): the set of slots(string) to do slots shuffle. + + Examples: + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_merge_by_lineid() + #suppose there is a slot 0 + dataset.slots_shuffle(['0']) + """ + if self.fea_eval: + slots_set = set(slots) + self.dataset.slots_shuffle(slots_set) + class QueueDataset(DatasetBase): """ diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 582bb3dcc6819..e7dc35ad9a088 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -951,14 +951,19 @@ def test_dataset_fleet2(self): exe.run(startup_program) dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + use_var=slots_vars) + #dataset.set_batch_size(32) + #dataset.set_thread(3) dataset.set_filelist([ "test_in_memory_dataset2_run2_a.txt", "test_in_memory_dataset2_run2_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + #dataset.set_pipe_command("cat") + #dataset.set_use_var(slots_vars) dataset.load_into_memory() try: dataset.global_shuffle(fleet) From a1afee747656ffb407b4bd49f448eab2d12a091f Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Tue, 8 Sep 2020 18:20:14 +0800 Subject: [PATCH 03/14] hide some apis --- .../distributed/fleet/dataset/dataset.py | 395 +++++++++--------- python/paddle/fluid/reader.py | 4 +- .../fluid/tests/unittests/dist_fleet_ctr.py | 13 +- .../tests/unittests/dist_fleet_ctr_ps_gpu.py | 8 +- .../tests/unittests/dist_fleet_heter_ctr.py | 8 +- .../fluid/tests/unittests/test_dataset.py | 149 +++---- .../unittests/test_dataset_dataloader.py | 6 +- .../tests/unittests/test_fleet_rolemaker_2.py | 2 +- .../fluid/tests/unittests/test_monitor.py | 8 +- 9 files changed, 304 insertions(+), 289 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index dc6bcdd284b30..051af970a4d8a 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -84,15 +84,15 @@ def init(self, fs_name="", fs_ugi="", download_cmd="cat"): - self.set_batch_size(batch_size) - self.set_thread(thread_num) - self.set_use_var(use_var) - self.set_pipe_command(pipe_command) - self.set_input_type(input_type) - self.set_hdfs_config(fs_name, fs_ugi) - self.set_download_cmd(download_cmd) + self._set_batch_size(batch_size) + self._set_thread(thread_num) + self._set_use_var(use_var) + self._set_pipe_command(pipe_command) + self._set_input_type(input_type) + self._set_hdfs_config(fs_name, fs_ugi) + self._set_download_cmd(download_cmd) - def set_pipe_command(self, pipe_command): + def _set_pipe_command(self, pipe_command): """ Set pipe command of current dataset A pipe command is a UNIX pipeline command that can be used only @@ -110,24 +110,7 @@ def set_pipe_command(self, pipe_command): """ self.proto_desc.pipe_command = pipe_command - def set_rank_offset(self, rank_offset): - """ - Set rank_offset for merge_pv. It set the message of Pv. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_rank_offset("rank_offset") - - Args: - rank_offset(str): rank_offset's name - - """ - self.proto_desc.rank_offset = rank_offset - - def set_batch_size(self, batch_size): + def _set_batch_size(self, batch_size): """ Set batch size. Will be effective during training @@ -136,7 +119,7 @@ def set_batch_size(self, batch_size): import paddle.fluid as fluid dataset = fluid.DatasetFactory().create_dataset() - dataset.set_batch_size(128) + dataset._set_batch_size(128) Args: batch_size(int): batch size @@ -144,23 +127,11 @@ def set_batch_size(self, batch_size): """ self.proto_desc.batch_size = batch_size - def set_pv_batch_size(self, pv_batch_size): - """ - Set pv batch size. It will be effective during enable_pv_merge - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_pv_batch(128) - Args: - pv_batch_size(int): pv batch size - - """ - self.proto_desc.pv_batch_size = pv_batch_size + # temp + def debug_print(self): + print(">>>>>>>>>>batch_size: {}".format(self.proto_desc.batch_size)) - def set_thread(self, thread_num): + def _set_thread(self, thread_num): """ Set thread num, it is the num of readers. @@ -169,7 +140,7 @@ def set_thread(self, thread_num): import paddle.fluid as fluid dataset = fluid.DatasetFactory().create_dataset() - dataset.set_thread(12) + dataset._set_thread(12) Args: thread_num(int): thread num @@ -194,10 +165,10 @@ def set_filelist(self, filelist): self.dataset.set_filelist(filelist) self.filelist = filelist - def set_input_type(self, input_type): + def _set_input_type(self, input_type): self.proto_desc.input_type = input_type - def set_use_var(self, var_list): + def _set_use_var(self, var_list): """ Set Variables which you will use. @@ -228,7 +199,7 @@ def set_use_var(self, var_list): "Currently, fluid.dataset only supports dtype=float32 and dtype=int64" ) - def set_hdfs_config(self, fs_name, fs_ugi): + def _set_hdfs_config(self, fs_name, fs_ugi): """ Set hdfs config: fs name ad ugi @@ -245,7 +216,7 @@ def set_hdfs_config(self, fs_name, fs_ugi): """ self.dataset.set_hdfs_config(fs_name, fs_ugi) - def set_download_cmd(self, download_cmd): + def _set_download_cmd(self, download_cmd): """ Set customized download cmd: download_cmd @@ -348,31 +319,36 @@ def init(self, **kwargs): data_feed_type = kwargs.get("data_feed_type", "MultiSlotInMemoryDataFeed") - self.set_feed_type(data_feed_type) + self._set_feed_type(data_feed_type) if kwargs.get("queue_num", -1) > 0: queue_num = kwargs.get("queue_num", -1) - self.set_queue_num(queue_num) + self._set_queue_num(queue_num) parse_ins_id = kwargs.get("parse_ins_id", False) if parse_ins_id: - self.set_parse_ins_id(parse_ins_id) + self._set_parse_ins_id(parse_ins_id) parse_content = kwargs.get("parse_content", False) if parse_content: - self.set_parse_content(parse_content) + self._set_parse_content(parse_content) fleet_send_batch_size = kwargs.get("fleet_send_batch_size", None) if fleet_send_batch_size: - self.set_fleet_send_batch_size(fleet_send_batch_size) + self._set_fleet_send_batch_size(fleet_send_batch_size) fleet_send_sleep_seconds = kwargs.get("fleet_send_sleep_seconds", None) if fleet_send_batch_size: - self.set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) + self._set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) merge_size = kwargs.get("merge_size", -1) if merge_size > 0: - self.set_merge_by_lineid(merge_size) + self._set_merge_by_lineid(merge_size) - def set_feed_type(self, data_feed_type): + fea_eval = kwargs.get("fea_eval", False) + if fea_eval: + candidate_size = kwargs.get("candidate_size", 10000) + self._set_fea_eval(candidate_size, True) + + def _set_feed_type(self, data_feed_type): """ Set data_feed_desc """ @@ -408,7 +384,7 @@ def _dynamic_adjust_after_train(self): self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) - def set_queue_num(self, queue_num): + def _set_queue_num(self, queue_num): """ Set Dataset output queue num, training threads get data from queues @@ -426,7 +402,7 @@ def set_queue_num(self, queue_num): self.is_user_set_queue_num = True self.queue_num = queue_num - def set_parse_ins_id(self, parse_ins_id): + def _set_parse_ins_id(self, parse_ins_id): """ Set id Dataset need to parse insid @@ -443,7 +419,7 @@ def set_parse_ins_id(self, parse_ins_id): """ self.parse_ins_id = parse_ins_id - def set_parse_content(self, parse_content): + def _set_parse_content(self, parse_content): """ Set if Dataset need to parse content @@ -460,113 +436,7 @@ def set_parse_content(self, parse_content): """ self.parse_content = parse_content - def set_parse_logkey(self, parse_logkey): - """ - Set if Dataset need to parse logkey - - Args: - parse_content(bool): if parse logkey or not - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_parse_logkey(True) - - """ - self.parse_logkey = parse_logkey - - def set_merge_by_sid(self, merge_by_sid): - """ - Set if Dataset need to merge sid. If not, one ins means one Pv. - - Args: - merge_by_sid(bool): if merge sid or not - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_merge_by_sid(True) - - """ - self.merge_by_sid = merge_by_sid - - def set_enable_pv_merge(self, enable_pv_merge): - """ - Set if Dataset need to merge pv. - - Args: - enable_pv_merge(bool): if enable_pv_merge or not - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_enable_pv_merge(True) - - """ - self.enable_pv_merge = enable_pv_merge - - def preprocess_instance(self): - """ - Merge pv instance and convey it from input_channel to input_pv_channel. - It will be effective when enable_pv_merge_ is True. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.preprocess_instance() - - """ - self.dataset.preprocess_instance() - - def set_current_phase(self, current_phase): - """ - Set current phase in train. It is useful for untest. - current_phase : 1 for join, 0 for update. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.set_current_phase(1) - - """ - self.dataset.set_current_phase(current_phase) - - def postprocess_instance(self): - """ - Divide pv instance and convey it to input_channel. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.preprocess_instance() - exe.train_from_dataset(dataset) - dataset.postprocess_instance() - - """ - self.dataset.postprocess_instance() - - def set_fleet_send_batch_size(self, fleet_send_batch_size=1024): + def _set_fleet_send_batch_size(self, fleet_send_batch_size=1024): """ Set fleet send batch size, default is 1024 @@ -583,7 +453,7 @@ def set_fleet_send_batch_size(self, fleet_send_batch_size=1024): """ self.fleet_send_batch_size = fleet_send_batch_size - def set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): + def _set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): """ Set fleet send sleep time, default is 0 @@ -600,7 +470,7 @@ def set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): """ self.fleet_send_sleep_seconds = fleet_send_sleep_seconds - def set_merge_by_lineid(self, merge_size=2): + def _set_merge_by_lineid(self, merge_size=2): """ Set merge by line id, instances of same line id will be merged after shuffle, you should parse line id in data generator. @@ -772,30 +642,6 @@ def release_memory(self): """ self.dataset.release_memory() - def get_pv_data_size(self): - """ - Get memory data size of Pv, user can call this function to know the pv num - of ins in all workers after load into memory. - - Note: - This function may cause bad performance, because it has barrier - - Returns: - The size of memory pv data. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - print dataset.get_pv_data_size() - - """ - return self.dataset.get_pv_data_size() - def get_memory_data_size(self, fleet=None): """ Get memory data size, user can call this function to know the num @@ -870,7 +716,7 @@ def get_shuffle_data_size(self, fleet=None): return global_data_size[0] return local_data_size[0] - def set_fea_eval(self, record_candidate_size, fea_eval=True): + def _set_fea_eval(self, record_candidate_size, fea_eval=True): """ set fea eval mode for slots shuffle to debug the importance level of slots(features), fea_eval need to be set True for slots shuffle. @@ -1057,6 +903,90 @@ def __init__(self): self.boxps = core.BoxPS(self.dataset) self.proto_desc.name = "PaddleBoxDataFeed" + def _set_rank_offset(self, rank_offset): + """ + Set rank_offset for merge_pv. It set the message of Pv. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_rank_offset("rank_offset") + + Args: + rank_offset(str): rank_offset's name + + """ + self.proto_desc.rank_offset = rank_offset + + def _set_pv_batch_size(self, pv_batch_size): + """ + Set pv batch size. It will be effective during enable_pv_merge + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_pv_batch(128) + Args: + pv_batch_size(int): pv batch size + + """ + self.proto_desc.pv_batch_size = pv_batch_size + + def _set_parse_logkey(self, parse_logkey): + """ + Set if Dataset need to parse logkey + + Args: + parse_content(bool): if parse logkey or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_parse_logkey(True) + + """ + self.parse_logkey = parse_logkey + + def _set_merge_by_sid(self, merge_by_sid): + """ + Set if Dataset need to merge sid. If not, one ins means one Pv. + + Args: + merge_by_sid(bool): if merge sid or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_merge_by_sid(True) + + """ + self.merge_by_sid = merge_by_sid + + def _set_enable_pv_merge(self, enable_pv_merge): + """ + Set if Dataset need to merge pv. + + Args: + enable_pv_merge(bool): if enable_pv_merge or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_enable_pv_merge(True) + + """ + self.enable_pv_merge = enable_pv_merge + def set_date(self, date): """ Workaround for date @@ -1167,3 +1097,82 @@ def slots_shuffle(self, slots): """ slots_set = set(slots) self.boxps.slots_shuffle(slots_set) + + def set_current_phase(self, current_phase): + """ + Set current phase in train. It is useful for untest. + current_phase : 1 for join, 0 for update. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.set_current_phase(1) + + """ + self.dataset.set_current_phase(current_phase) + + def get_pv_data_size(self): + """ + Get memory data size of Pv, user can call this function to know the pv num + of ins in all workers after load into memory. + + Note: + This function may cause bad performance, because it has barrier + + Returns: + The size of memory pv data. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + print dataset.get_pv_data_size() + + """ + return self.dataset.get_pv_data_size() + + def preprocess_instance(self): + """ + Merge pv instance and convey it from input_channel to input_pv_channel. + It will be effective when enable_pv_merge_ is True. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.preprocess_instance() + + """ + self.dataset.preprocess_instance() + + def postprocess_instance(self): + """ + Divide pv instance and convey it to input_channel. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.preprocess_instance() + exe.train_from_dataset(dataset) + dataset.postprocess_instance() + + """ + self.dataset.postprocess_instance() diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 76c95be75d67d..7ace0bb72a6d3 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -1723,13 +1723,13 @@ def __init__(self, dataset, places, drop_last): logging.warn('thread_num {} which is set in Dataset is ignored'. format(dataset.thread_num)) - dataset.set_thread(thread_num) + dataset._set_thread(thread_num) if isinstance(dataset, paddle.distributed.fleet.dataset. InMemoryDataset) and dataset.queue_num > thread_num: logging.warn("queue_num {} which is set in Dataset is ignored". format(dataset.queue_num)) - dataset.set_queue_num(thread_num) + dataset._set_queue_num(thread_num) self._dataset = dataset use_slots = [ diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index dc39472d7aed8..bf99bb275db52 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -209,13 +209,18 @@ def do_dataset_training(self, fleet): # config dataset dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() - dataset.set_batch_size(batch_size) - dataset.set_use_var(self.feeds) + dataset.init( + batch_size=batch_size, + use_var=self.feeds, + pipe_command=pipe_command, + thread_num=thread_num) + dataset._set_batch_size(batch_size) + dataset._set_use_var(self.feeds) pipe_command = 'python ctr_dataset_reader.py' - dataset.set_pipe_command(pipe_command) + dataset._set_pipe_command(pipe_command) dataset.set_filelist(filelist) - dataset.set_thread(thread_num) + dataset._set_thread(thread_num) for epoch_id in range(1): pass_start = time.time() diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py index 03d0fa447daf3..4318db31b9a5c 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py @@ -115,13 +115,13 @@ def do_dataset_training(self, fleet): # config dataset dataset = paddle.fleet.DatasetFactory().create_dataset() - dataset.set_batch_size(batch_size) - dataset.set_use_var(self.feeds) + dataset._set_batch_size(batch_size) + dataset._set_use_var(self.feeds) pipe_command = 'python ctr_dataset_reader.py' - dataset.set_pipe_command(pipe_command) + dataset._set_pipe_command(pipe_command) dataset.set_filelist(filelist) - dataset.set_thread(thread_num) + dataset._set_thread(thread_num) for epoch_id in range(1): pass_start = time.time() diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py index 7a4e7534f0739..636a6c40660b8 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py @@ -184,13 +184,13 @@ def do_dataset_training(self, fleet): # config dataset dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() - dataset.set_batch_size(batch_size) - dataset.set_use_var(self.feeds) + dataset._set_batch_size(batch_size) + dataset._set_use_var(self.feeds) pipe_command = 'python ctr_dataset_reader.py' - dataset.set_pipe_command(pipe_command) + dataset._set_pipe_command(pipe_command) dataset.set_filelist(filelist) - dataset.set_thread(thread_num) + dataset._set_thread(thread_num) for epoch_id in range(1): pass_start = time.time() diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index e7dc35ad9a088..f4e1d0a15f7e9 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -97,16 +97,16 @@ def test_run_with_dump(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset._set_batch_size(32) + dataset._set_thread(3) dataset.set_filelist( ["test_run_with_dump_a.txt", "test_run_with_dump_b.txt"]) - dataset.set_parse_ins_id(True) - dataset.set_parse_content(True) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_parse_ins_id(True) + dataset._set_parse_content(True) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() - dataset.set_fea_eval(10000, True) + dataset._set_fea_eval(10000, True) dataset.local_shuffle() exe = fluid.Executor(fluid.CPUPlace()) @@ -178,12 +178,12 @@ def test_set_download_cmd(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset._set_batch_size(32) + dataset._set_thread(3) dataset.set_filelist([filename1, filename2]) - dataset.set_pipe_command("cat") - dataset.set_download_cmd("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_download_cmd("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) @@ -230,16 +230,16 @@ def test_in_memory_dataset_run(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset._set_batch_size(32) + dataset._set_thread(3) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() - dataset.set_fea_eval(1, True) + dataset._set_fea_eval(1, True) dataset.slots_shuffle(["slot1"]) dataset.local_shuffle() dataset.set_generate_unique_feasigns(True, 15) @@ -302,15 +302,15 @@ def test_in_memory_dataset_masterpatch(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(1) - dataset.set_parse_ins_id(True) + dataset._set_batch_size(32) + dataset._set_thread(1) + dataset._set_parse_ins_id(True) dataset.set_filelist([ "test_in_memory_dataset_masterpatch_a.txt", "test_in_memory_dataset_masterpatch_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -325,7 +325,7 @@ def test_in_memory_dataset_masterpatch(self): except Exception as e: self.assertTrue(False) - dataset.set_merge_by_lineid(2) + dataset._set_merge_by_lineid(2) dataset.dataset.merge_by_lineid() os.remove("./test_in_memory_dataset_masterpatch_a.txt") @@ -369,15 +369,15 @@ def test_in_memory_dataset_masterpatch1(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(1) - dataset.set_parse_ins_id(True) + dataset._set_batch_size(32) + dataset._set_thread(1) + dataset._set_parse_ins_id(True) dataset.set_filelist([ "test_in_memory_dataset_masterpatch1_a.txt", "test_in_memory_dataset_masterpatch1_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -392,7 +392,7 @@ def test_in_memory_dataset_masterpatch1(self): except Exception as e: self.assertTrue(False) - dataset.set_merge_by_lineid(2) + dataset._set_merge_by_lineid(2) dataset.dataset.merge_by_lineid() os.remove("./test_in_memory_dataset_masterpatch1_a.txt") @@ -425,14 +425,14 @@ def test_in_memory_dataset_run_2(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset._set_batch_size(32) + dataset._set_thread(3) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -473,9 +473,9 @@ def test_in_memory_dataset_run_2(self): except Exception as e: self.assertTrue(False) - dataset.set_merge_by_lineid(2) - dataset.set_parse_ins_id(False) - dataset.set_fleet_send_sleep_seconds(2) + dataset._set_merge_by_lineid(2) + dataset._set_parse_ins_id(False) + dataset._set_fleet_send_sleep_seconds(2) dataset.preload_into_memory() dataset.wait_preload_done() dataset.release_memory() @@ -483,8 +483,8 @@ def test_in_memory_dataset_run_2(self): dataset.wait_preload_done() dataset.dataset.merge_by_lineid() dataset.release_memory() - dataset.set_merge_by_lineid(30) - dataset.set_parse_ins_id(False) + dataset._set_merge_by_lineid(30) + dataset._set_parse_ins_id(False) dataset.load_into_memory() dataset.dataset.merge_by_lineid() fleet_ptr = fluid.core.Fleet() @@ -519,12 +519,12 @@ def test_queue_dataset_run(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "QueueDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset._set_batch_size(32) + dataset._set_thread(3) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) @@ -545,10 +545,10 @@ def test_queue_dataset_run(self): dataset2 = paddle.distributed.fleet.DatasetFactory().create_dataset( "QueueDataset") - dataset2.set_use_var(slots_vars) - dataset2.set_batch_size(32) - dataset2.set_thread(3) - dataset2.set_pipe_command("cat") + dataset2._set_use_var(slots_vars) + dataset2._set_batch_size(32) + dataset2._set_thread(3) + dataset2._set_pipe_command("cat") dataset.set_filelist([]) try: exe.train_from_dataset(fluid.default_main_program(), dataset2) @@ -587,12 +587,12 @@ def test_queue_dataset_run_2(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "QueueDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset._set_batch_size(32) + dataset._set_thread(3) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( ) else fluid.CUDAPlace(0)) @@ -643,13 +643,13 @@ def test_queue_dataset_run_3(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_input_type(1) - dataset.set_batch_size(1) - dataset.set_thread(2) + dataset._set_input_type(1) + dataset._set_batch_size(1) + dataset._set_thread(2) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( @@ -723,11 +723,11 @@ def get_dataset(self, inputs, files): """ dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "QueueDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset._set_batch_size(32) + dataset._set_thread(3) dataset.set_filelist(files) - dataset.set_pipe_command("cat") - dataset.set_use_var(inputs) + dataset._set_pipe_command("cat") + dataset._set_use_var(inputs) return dataset def setUp(self): @@ -882,13 +882,13 @@ def test_dataset_fleet(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") dataset.set_batch_size(32) - dataset.set_thread(3) + dataset._set_thread(3) dataset.set_filelist([ "test_in_memory_dataset2_run_a.txt", "test_in_memory_dataset2_run_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() fleet._opt_info = None fleet._fleet_ptr = None @@ -957,13 +957,14 @@ def test_dataset_fleet2(self): pipe_command="cat", use_var=slots_vars) #dataset.set_batch_size(32) - #dataset.set_thread(3) + #dataset._set_thread(3) + dataset.debug_print() dataset.set_filelist([ "test_in_memory_dataset2_run2_a.txt", "test_in_memory_dataset2_run2_b.txt" ]) - #dataset.set_pipe_command("cat") - #dataset.set_use_var(slots_vars) + #dataset._set_pipe_command("cat") + #dataset._set_use_var(slots_vars) dataset.load_into_memory() try: dataset.global_shuffle(fleet) @@ -973,12 +974,12 @@ def test_dataset_fleet2(self): fleet._fleet_ptr = None dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_rank_offset("") - dataset.set_pv_batch_size(1) - dataset.set_hdfs_config("", "") + # dataset.set_rank_offset("") + # dataset.set_pv_batch_size(1) + dataset._set_hdfs_config("", "") d = paddle.distributed.fleet.DatasetBase() try: - dataset.set_feed_type("MultiSlotInMemoryDataFeed") + dataset._set_feed_type("MultiSlotInMemoryDataFeed") except: print("warning: catch expected error") dataset.thread_num = 0 @@ -986,9 +987,9 @@ def test_dataset_fleet2(self): dataset._prepare_to_run() except: print("warning: catch expected error") - dataset.set_parse_logkey(True) - dataset.set_merge_by_sid(True) - dataset.set_enable_pv_merge(True) + #dataset.set_parse_logkey(True) + #dataset.set_merge_by_sid(True) + #dataset.set_enable_pv_merge(True) try: dataset.preprocess_instance() except: @@ -1001,12 +1002,12 @@ def test_dataset_fleet2(self): dataset.postprocess_instance() except: print("warning: catch expected error") - dataset.set_fleet_send_batch_size(1024) + dataset._set_fleet_send_batch_size(1024) try: dataset.global_shuffle() except: print("warning: catch expected error") - dataset.get_pv_data_size() + #dataset.get_pv_data_size() dataset.get_memory_data_size() dataset.get_shuffle_data_size() dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( diff --git a/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py b/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py index c13c33f209f0f..a4494e88aa9d6 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py +++ b/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py @@ -99,7 +99,7 @@ def check_batch_number(self, place, randomize_batch_num=False): main_prog, startup_prog, feeds = self.build_network() dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( self.dataset_name) - dataset.set_batch_size(BATCH_SIZE) + dataset._set_batch_size(BATCH_SIZE) if isinstance(place, fluid.CPUPlace): file_num = 10 @@ -128,8 +128,8 @@ def check_batch_number(self, place, randomize_batch_num=False): fake_reader(batch_num=BATCH_NUM + random_delta_batch_size[i])) dataset.set_filelist(filelist) - dataset.set_use_var(feeds) - dataset.set_pipe_command("cat") + dataset._set_use_var(feeds) + dataset._set_pipe_command("cat") if self.dataset_name == 'InMemoryDataset': dataset.load_into_memory() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py index eb5d9eb66608d..722949934eb34 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py @@ -166,7 +166,7 @@ def test_pslib_2(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"]) - dataset.set_use_var([show, label]) + dataset._set_use_var([show, label]) dataset.load_into_memory() dataset.get_memory_data_size(fleet) dataset.get_shuffle_data_size(fleet) diff --git a/python/paddle/fluid/tests/unittests/test_monitor.py b/python/paddle/fluid/tests/unittests/test_monitor.py index f6207edb41c19..10e196b754c2d 100644 --- a/python/paddle/fluid/tests/unittests/test_monitor.py +++ b/python/paddle/fluid/tests/unittests/test_monitor.py @@ -54,16 +54,16 @@ def test_dataset_run_with_stat(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) + dataset._set_batch_size(32) dataset.set_thread(3) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() - dataset.set_fea_eval(1, True) + dataset._set_fea_eval(1, True) dataset.slots_shuffle(["slot1"]) exe = fluid.Executor(fluid.CPUPlace()) From f32f283cb26d2d82a9c43111cb9d4d69f85ef847 Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Tue, 8 Sep 2020 19:21:05 +0800 Subject: [PATCH 04/14] fix boxps dataset apis --- .../distributed/fleet/dataset/dataset.py | 23 ++++++++++++++++--- .../fluid/tests/unittests/test_dataset.py | 4 ++-- .../fluid/tests/unittests/test_monitor.py | 2 +- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 051af970a4d8a..0e83840df62e0 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -490,13 +490,13 @@ def _set_merge_by_lineid(self, merge_size=2): self.merge_by_lineid = True self.parse_ins_id = True - def set_generate_unique_feasigns(self, generate_uni_feasigns, shard_num): + def _set_generate_unique_feasigns(self, generate_uni_feasigns, shard_num): self.dataset.set_generate_unique_feasigns(generate_uni_feasigns) self.gen_uni_feasigns = generate_uni_feasigns self.local_shard_num = shard_num - def generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, - consume_thread_num, shard_num): + def _generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, + consume_thread_num, shard_num): self.dataset.generate_local_tables_unlock( table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) @@ -903,6 +903,23 @@ def __init__(self): self.boxps = core.BoxPS(self.dataset) self.proto_desc.name = "PaddleBoxDataFeed" + def init(self, **kwargs): + """ + should be called only once in user's python scripts to initialize seetings of dataset instance + """ + super(BoxPSDataset, self).init(**kwargs) + + rank_offset = kwargs.get("rank_offset", "") + self._set_rank_offset(rank_offset) + pv_batch_size = kwargs.get("pv_batch_size", 1) + self._set_pv_batch_size(pv_batch_size) + parse_logkey = kwargs.get("parse_logkey", False) + self._set_parse_logkey(parse_logkey) + merge_by_sid = kwargs.get("merge_by_sid", False) + self._set_merge_by_sid(merge_by_sid) + enable_pv_merge = kwargs.get("enable_pv_merge", False) + self._set_enable_pv_merge(enable_pv_merge) + def _set_rank_offset(self, rank_offset): """ Set rank_offset for merge_pv. It set the message of Pv. diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index f4e1d0a15f7e9..48801a0967f25 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -242,8 +242,8 @@ def test_in_memory_dataset_run(self): dataset._set_fea_eval(1, True) dataset.slots_shuffle(["slot1"]) dataset.local_shuffle() - dataset.set_generate_unique_feasigns(True, 15) - dataset.generate_local_tables_unlock(0, 11, 1, 25, 15) + dataset._set_generate_unique_feasigns(True, 15) + dataset._generate_local_tables_unlock(0, 11, 1, 25, 15) exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) if self.use_data_loader: diff --git a/python/paddle/fluid/tests/unittests/test_monitor.py b/python/paddle/fluid/tests/unittests/test_monitor.py index 10e196b754c2d..9dd1092976bf3 100644 --- a/python/paddle/fluid/tests/unittests/test_monitor.py +++ b/python/paddle/fluid/tests/unittests/test_monitor.py @@ -55,7 +55,7 @@ def test_dataset_run_with_stat(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") dataset._set_batch_size(32) - dataset.set_thread(3) + dataset._set_thread(3) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" From 292e3356455749e97e2ab827c913ab7ea6e2c044 Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Wed, 9 Sep 2020 00:28:06 +0800 Subject: [PATCH 05/14] add box_bs related ut --- .../distributed/fleet/dataset/dataset.py | 21 +- .../fluid/tests/unittests/test_dataset.py | 240 +++++++++++++----- 2 files changed, 186 insertions(+), 75 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 0e83840df62e0..2335a1927a898 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -324,24 +324,23 @@ def init(self, **kwargs): if kwargs.get("queue_num", -1) > 0: queue_num = kwargs.get("queue_num", -1) self._set_queue_num(queue_num) + merge_size = kwargs.get("merge_size", -1) + if merge_size > 0: + self._set_merge_by_lineid(merge_size) + parse_ins_id = kwargs.get("parse_ins_id", False) - if parse_ins_id: - self._set_parse_ins_id(parse_ins_id) + self._set_parse_ins_id(parse_ins_id) parse_content = kwargs.get("parse_content", False) - if parse_content: - self._set_parse_content(parse_content) + self._set_parse_content(parse_content) fleet_send_batch_size = kwargs.get("fleet_send_batch_size", None) if fleet_send_batch_size: self._set_fleet_send_batch_size(fleet_send_batch_size) fleet_send_sleep_seconds = kwargs.get("fleet_send_sleep_seconds", None) - if fleet_send_batch_size: + if fleet_send_sleep_seconds: self._set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) - merge_size = kwargs.get("merge_size", -1) - if merge_size > 0: - self._set_merge_by_lineid(merge_size) fea_eval = kwargs.get("fea_eval", False) if fea_eval: @@ -782,6 +781,12 @@ def __init__(self): super(QueueDataset, self).__init__() self.proto_desc.name = "MultiSlotDataFeed" + def init(self, **kwargs): + """ + should be called only once in user's python scripts to initialize seetings of dataset instance + """ + super(QueueDataset, self).init(**kwargs) + def _prepare_to_run(self): """ Set data_feed_desc/thread num/filelist before run, diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 48801a0967f25..91f3eaa8ac897 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -97,16 +97,18 @@ def test_run_with_dump(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset._set_batch_size(32) - dataset._set_thread(3) + dataset.init( + batch_size=32, + thread_num=3, + parse_ins_id=True, + parse_content=True, + pipe_command="cat", + use_var=slots_vars, + fea_eval=True, + candidate_size=10000) dataset.set_filelist( ["test_run_with_dump_a.txt", "test_run_with_dump_b.txt"]) - dataset._set_parse_ins_id(True) - dataset._set_parse_content(True) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) dataset.load_into_memory() - dataset._set_fea_eval(10000, True) dataset.local_shuffle() exe = fluid.Executor(fluid.CPUPlace()) @@ -178,12 +180,13 @@ def test_set_download_cmd(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset._set_batch_size(32) - dataset._set_thread(3) + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + download_cmd="cat", + use_var=slots_vars) dataset.set_filelist([filename1, filename2]) - dataset._set_pipe_command("cat") - dataset._set_download_cmd("cat") - dataset._set_use_var(slots_vars) dataset.load_into_memory() exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) @@ -230,16 +233,18 @@ def test_in_memory_dataset_run(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset._set_batch_size(32) - dataset._set_thread(3) + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + use_var=slots_vars, + fea_eval=True, + candidate_size=1) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" ]) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) dataset.load_into_memory() - dataset._set_fea_eval(1, True) dataset.slots_shuffle(["slot1"]) dataset.local_shuffle() dataset._set_generate_unique_feasigns(True, 15) @@ -302,15 +307,16 @@ def test_in_memory_dataset_masterpatch(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset._set_batch_size(32) - dataset._set_thread(1) - dataset._set_parse_ins_id(True) + dataset.init( + batch_size=32, + thread_num=1, + parse_ins_id=True, + pipe_command="cat", + use_var=slots_vars) dataset.set_filelist([ "test_in_memory_dataset_masterpatch_a.txt", "test_in_memory_dataset_masterpatch_b.txt" ]) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -369,15 +375,16 @@ def test_in_memory_dataset_masterpatch1(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset._set_batch_size(32) - dataset._set_thread(1) - dataset._set_parse_ins_id(True) + dataset.init( + batch_size=32, + thread_num=1, + parse_ins_id=True, + pipe_command="cat", + use_var=slots_vars) dataset.set_filelist([ "test_in_memory_dataset_masterpatch1_a.txt", "test_in_memory_dataset_masterpatch1_b.txt" ]) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -425,14 +432,12 @@ def test_in_memory_dataset_run_2(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset._set_batch_size(32) - dataset._set_thread(3) + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" ]) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -519,12 +524,10 @@ def test_queue_dataset_run(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "QueueDataset") - dataset._set_batch_size(32) - dataset._set_thread(3) + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) @@ -545,10 +548,8 @@ def test_queue_dataset_run(self): dataset2 = paddle.distributed.fleet.DatasetFactory().create_dataset( "QueueDataset") - dataset2._set_use_var(slots_vars) - dataset2._set_batch_size(32) - dataset2._set_thread(3) - dataset2._set_pipe_command("cat") + dataset2.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist([]) try: exe.train_from_dataset(fluid.default_main_program(), dataset2) @@ -587,12 +588,10 @@ def test_queue_dataset_run_2(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "QueueDataset") - dataset._set_batch_size(32) - dataset._set_thread(3) + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( ) else fluid.CUDAPlace(0)) @@ -643,13 +642,14 @@ def test_queue_dataset_run_3(self): dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset._set_input_type(1) - dataset._set_batch_size(1) - dataset._set_thread(2) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) dataset.load_into_memory() exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( @@ -723,11 +723,9 @@ def get_dataset(self, inputs, files): """ dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "QueueDataset") - dataset._set_batch_size(32) - dataset._set_thread(3) + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=inputs) dataset.set_filelist(files) - dataset._set_pipe_command("cat") - dataset._set_use_var(inputs) return dataset def setUp(self): @@ -881,14 +879,16 @@ def test_dataset_fleet(self): exe.run(startup_program) dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - dataset.set_batch_size(32) - dataset._set_thread(3) + + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + use_var=slots_vars) dataset.set_filelist([ "test_in_memory_dataset2_run_a.txt", "test_in_memory_dataset2_run_b.txt" ]) - dataset._set_pipe_command("cat") - dataset._set_use_var(slots_vars) dataset.load_into_memory() fleet._opt_info = None fleet._fleet_ptr = None @@ -956,15 +956,10 @@ def test_dataset_fleet2(self): thread_num=3, pipe_command="cat", use_var=slots_vars) - #dataset.set_batch_size(32) - #dataset._set_thread(3) - dataset.debug_print() dataset.set_filelist([ "test_in_memory_dataset2_run2_a.txt", "test_in_memory_dataset2_run2_b.txt" ]) - #dataset._set_pipe_command("cat") - #dataset._set_use_var(slots_vars) dataset.load_into_memory() try: dataset.global_shuffle(fleet) @@ -974,9 +969,7 @@ def test_dataset_fleet2(self): fleet._fleet_ptr = None dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( "InMemoryDataset") - # dataset.set_rank_offset("") - # dataset.set_pv_batch_size(1) - dataset._set_hdfs_config("", "") + dataset.init(fs_name="", fs_ugi="") d = paddle.distributed.fleet.DatasetBase() try: dataset._set_feed_type("MultiSlotInMemoryDataFeed") @@ -987,9 +980,6 @@ def test_dataset_fleet2(self): dataset._prepare_to_run() except: print("warning: catch expected error") - #dataset.set_parse_logkey(True) - #dataset.set_merge_by_sid(True) - #dataset.set_enable_pv_merge(True) try: dataset.preprocess_instance() except: @@ -1033,6 +1023,122 @@ def test_dataset_fleet2(self): os.remove("./test_in_memory_dataset2_run2_a.txt") os.remove("./test_in_memory_dataset2_run2_b.txt") + def test_bosps_dataset_fleet2(self): + """ + Testcase for InMemoryDataset from create to run. + """ + with open("test_in_memory_dataset2_run2_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_in_memory_dataset2_run2_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + with fluid.program_guard(train_program, startup_program): + slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data(\ + name=slot, shape=[1], dtype="float32", lod_level=1) + slots_vars.append(var) + fake_cost = \ + fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1]) + fake_cost = fluid.layers.mean(fake_cost) + with fluid.scope_guard(scope): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + try: + fleet.init() + except ImportError as e: + print("warning: no mpi4py") + adam = fluid.optimizer.Adam(learning_rate=0.000005) + try: + adam = fleet.distributed_optimizer( + adam, + strategy={ + "fs_uri": "fs_uri_xxx", + "fs_user": "fs_user_xxx", + "fs_passwd": "fs_passwd_xxx", + "fs_hadoop_bin": "fs_hadoop_bin_xxx" + }) + adam.minimize([fake_cost], [scope]) + except AttributeError as e: + print("warning: no mpi") + except ImportError as e: + print("warning: no mpi4py") + exe.run(startup_program) + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( + "BoxPSDataset") + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + use_var=slots_vars) + dataset.set_filelist([ + "test_in_memory_dataset2_run2_a.txt", + "test_in_memory_dataset2_run2_b.txt" + ]) + dataset.load_into_memory() + try: + dataset.global_shuffle(fleet) + except: + print("warning: catch expected error") + fleet._opt_info = None + fleet._fleet_ptr = None + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( + "BoxPSDataset") + dataset.init( + rank_offset="", + pv_batch_size=1, + fs_name="", + fs_ugi="", + data_feed_type="MultiSlotInMemoryDataFeed", + parse_logkey=True, + merge_by_sid=True, + enable_pv_merge=True) + d = paddle.distributed.fleet.DatasetBase() + try: + dataset._set_feed_type("MultiSlotInMemoryDataFeed") + except: + print("warning: catch expected error") + dataset.thread_num = 0 + try: + dataset._prepare_to_run() + except: + print("warning: catch expected error") + dataset._set_parse_logkey(True) + dataset._set_merge_by_sid(True) + dataset._set_enable_pv_merge(True) + try: + dataset.preprocess_instance() + except: + print("warning: catch expected error") + try: + dataset.set_current_phase(1) + except: + print("warning: catch expected error") + try: + dataset.postprocess_instance() + except: + print("warning: catch expected error") + dataset._set_fleet_send_batch_size(1024) + try: + dataset.global_shuffle() + except: + print("warning: catch expected error") + #dataset.get_pv_data_size() + dataset.get_memory_data_size() + dataset.get_shuffle_data_size() + if __name__ == '__main__': unittest.main() From a1677a43c12f8d17c4b193a1ade3a10ead45d4ca Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Wed, 9 Sep 2020 14:27:22 +0800 Subject: [PATCH 06/14] test ut fail --- python/paddle/distributed/fleet/dataset/dataset.py | 4 ---- python/paddle/fluid/tests/unittests/dist_fleet_ctr.py | 4 +++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 2335a1927a898..ae4bbbd423f6d 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -127,10 +127,6 @@ def _set_batch_size(self, batch_size): """ self.proto_desc.batch_size = batch_size - # temp - def debug_print(self): - print(">>>>>>>>>>batch_size: {}".format(self.proto_desc.batch_size)) - def _set_thread(self, thread_num): """ Set thread num, it is the num of readers. diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index bf99bb275db52..bed424f1532c0 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -209,14 +209,16 @@ def do_dataset_training(self, fleet): # config dataset dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + pipe_command = 'python ctr_dataset_reader.py' + ''' dataset.init( batch_size=batch_size, use_var=self.feeds, pipe_command=pipe_command, thread_num=thread_num) + ''' dataset._set_batch_size(batch_size) dataset._set_use_var(self.feeds) - pipe_command = 'python ctr_dataset_reader.py' dataset._set_pipe_command(pipe_command) dataset.set_filelist(filelist) From e8a601a98e75e522494a6f766f70b3b4346a9301 Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Wed, 9 Sep 2020 15:32:29 +0800 Subject: [PATCH 07/14] update example code --- .../distributed/fleet/dataset/dataset.py | 222 +++++++++--------- 1 file changed, 114 insertions(+), 108 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index ae4bbbd423f6d..71bfeaa0a1e5d 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -14,7 +14,7 @@ """This is definition of dataset class, which is high performance IO.""" import paddle -import paddle.fluid as fluid +import paddle from paddle.fluid.proto import data_feed_pb2 from google.protobuf import text_format import paddle.fluid.core as core @@ -29,8 +29,8 @@ class DatasetFactory(object): Example: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") """ @@ -50,8 +50,8 @@ def create_dataset(self, datafeed_class="QueueDataset"): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() """ try: @@ -100,9 +100,9 @@ def _set_pipe_command(self, pipe_command): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_pipe_command("python my_script.py") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset._set_pipe_command("python my_script.py") Args: pipe_command(str): pipe command @@ -117,8 +117,8 @@ def _set_batch_size(self, batch_size): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() dataset._set_batch_size(128) Args: @@ -134,8 +134,8 @@ def _set_thread(self, thread_num): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() dataset._set_thread(12) Args: @@ -151,8 +151,8 @@ def set_filelist(self, filelist): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() dataset.set_filelist(['a.txt', 'b.txt']) Args: @@ -171,9 +171,9 @@ def _set_use_var(self, var_list): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_use_var([data, label]) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset._set_use_var([data, label]) Args: var_list(list): variable list @@ -202,9 +202,9 @@ def _set_hdfs_config(self, fs_name, fs_ugi): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_hdfs_config("my_fs_name", "my_fs_ugi") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset._set_hdfs_config("my_fs_name", "my_fs_ugi") Args: fs_name(str): fs name @@ -219,9 +219,9 @@ def _set_download_cmd(self, download_cmd): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_download_cmd("./read_from_afs") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset._set_download_cmd("./read_from_afs") Args: download_cmd(str): customized download command @@ -249,8 +249,8 @@ def _desc(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() print(dataset._desc()) Returns: @@ -389,9 +389,9 @@ def _set_queue_num(self, queue_num): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_queue_num(12) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_queue_num(12) """ self.is_user_set_queue_num = True @@ -407,9 +407,9 @@ def _set_parse_ins_id(self, parse_ins_id): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_parse_ins_id(True) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_parse_ins_id(True) """ self.parse_ins_id = parse_ins_id @@ -424,9 +424,9 @@ def _set_parse_content(self, parse_content): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_parse_content(True) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_parse_content(True) """ self.parse_content = parse_content @@ -441,9 +441,9 @@ def _set_fleet_send_batch_size(self, fleet_send_batch_size=1024): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_fleet_send_batch_size(800) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_fleet_send_batch_size(800) """ self.fleet_send_batch_size = fleet_send_batch_size @@ -458,9 +458,9 @@ def _set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_fleet_send_sleep_seconds(2) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_fleet_send_sleep_seconds(2) """ self.fleet_send_sleep_seconds = fleet_send_sleep_seconds @@ -476,9 +476,9 @@ def _set_merge_by_lineid(self, merge_size=2): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_merge_by_lineid() + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_merge_by_lineid() """ self.dataset.set_merge_by_lineid(merge_size) @@ -502,8 +502,8 @@ def load_into_memory(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -521,8 +521,8 @@ def preload_into_memory(self, thread_num=None): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -542,8 +542,8 @@ def wait_preload_done(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -559,8 +559,8 @@ def local_shuffle(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -578,9 +578,9 @@ def global_shuffle(self, fleet=None, thread_num=12): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -622,9 +622,9 @@ def release_memory(self): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -654,9 +654,9 @@ def get_memory_data_size(self, fleet=None): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -691,9 +691,9 @@ def get_shuffle_data_size(self, fleet=None): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -725,9 +725,9 @@ def _set_fea_eval(self, record_candidate_size, fea_eval=True): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_fea_eval(1000000, True) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_fea_eval(1000000, True) """ if fea_eval: @@ -746,8 +746,8 @@ def slots_shuffle(self, slots): slots(list[string]): the set of slots(string) to do slots shuffle. Examples: - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") dataset.set_merge_by_lineid() #suppose there is a slot 0 dataset.slots_shuffle(['0']) @@ -764,8 +764,8 @@ class QueueDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("QueueDataset") """ @@ -807,8 +807,8 @@ def local_shuffle(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("QueueDataset") dataset.local_shuffle() Raises: @@ -832,9 +832,9 @@ def global_shuffle(self, fleet=None): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("QueueDataset") dataset.global_shuffle(fleet) Raises: @@ -853,7 +853,7 @@ class FileInstantDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle dataset = fluid.DatasetFactory.create_dataset("FileInstantDataset") """ @@ -865,6 +865,12 @@ def __init__(self): super(FileInstantDataset, self).__init__() self.proto_desc.name = "MultiSlotFileInstantDataFeed" + def init(self, **kwargs): + """ + should be called only once in user's python scripts to initialize seetings of dataset instance + """ + super(FileInstantDataset, self).init(**kwargs) + def local_shuffle(self): """ Local shuffle @@ -891,8 +897,8 @@ class BoxPSDataset(InMemoryDataset): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") """ def __init__(self): @@ -928,9 +934,9 @@ def _set_rank_offset(self, rank_offset): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_rank_offset("rank_offset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset._set_rank_offset("rank_offset") Args: rank_offset(str): rank_offset's name @@ -945,9 +951,9 @@ def _set_pv_batch_size(self, pv_batch_size): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_pv_batch(128) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset._set_pv_batch_size(128) Args: pv_batch_size(int): pv batch size @@ -964,9 +970,9 @@ def _set_parse_logkey(self, parse_logkey): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_parse_logkey(True) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_parse_logkey(True) """ self.parse_logkey = parse_logkey @@ -981,9 +987,9 @@ def _set_merge_by_sid(self, merge_by_sid): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_merge_by_sid(True) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_merge_by_sid(True) """ self.merge_by_sid = merge_by_sid @@ -998,9 +1004,9 @@ def _set_enable_pv_merge(self, enable_pv_merge): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_enable_pv_merge(True) + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset._set_enable_pv_merge(True) """ self.enable_pv_merge = enable_pv_merge @@ -1022,8 +1028,8 @@ def begin_pass(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") dataset.begin_pass() """ self.boxps.begin_pass() @@ -1035,8 +1041,8 @@ def end_pass(self, need_save_delta): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") dataset.end_pass(True) """ self.boxps.end_pass(need_save_delta) @@ -1048,8 +1054,8 @@ def wait_preload_done(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -1063,8 +1069,8 @@ def load_into_memory(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1078,8 +1084,8 @@ def preload_into_memory(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -1107,8 +1113,8 @@ def slots_shuffle(self, slots): slots(list[string]): the set of slots(string) to do slots shuffle. Examples: - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") dataset.set_merge_by_lineid() #suppose there is a slot 0 dataset.slots_shuffle(['0']) @@ -1124,8 +1130,8 @@ def set_current_phase(self, current_phase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1148,8 +1154,8 @@ def get_pv_data_size(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1166,8 +1172,8 @@ def preprocess_instance(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1183,8 +1189,8 @@ def postprocess_instance(self): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() From b41a9598ed502afd39c6453ab7ee32c3ba29600a Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Wed, 9 Sep 2020 15:38:38 +0800 Subject: [PATCH 08/14] update example --- python/paddle/distributed/fleet/dataset/dataset.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 71bfeaa0a1e5d..d170a3a185d28 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -192,7 +192,7 @@ def _set_use_var(self, var_list): slot_var.type = "uint64" else: raise ValueError( - "Currently, fluid.dataset only supports dtype=float32 and dtype=int64" + "Currently, paddle.distributed.fleet.dataset only supports dtype=float32 and dtype=int64" ) def _set_hdfs_config(self, fs_name, fs_ugi): @@ -272,7 +272,8 @@ class InMemoryDataset(DatasetBase): This class should be created by DatasetFactory Example: - dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") """ def __init__(self): @@ -854,7 +855,7 @@ class FileInstantDataset(DatasetBase): .. code-block:: python import paddle - dataset = fluid.DatasetFactory.create_dataset("FileInstantDataset") + dataset = paddle.distributed.fleet.DatasetFactory.create_dataset("FileInstantDataset") """ def __init__(self): From 138cff4499567b36b514fe879b6a9d3b60381188 Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Wed, 9 Sep 2020 15:50:57 +0800 Subject: [PATCH 09/14] fix --- python/paddle/distributed/fleet/dataset/dataset.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index d170a3a185d28..a18f5c03bf270 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -13,7 +13,6 @@ # limitations under the License. """This is definition of dataset class, which is high performance IO.""" -import paddle import paddle from paddle.fluid.proto import data_feed_pb2 from google.protobuf import text_format @@ -293,7 +292,7 @@ def __init__(self): def init(self, **kwargs): """ - should be called only once in user's python scripts to initialize seetings of dataset instance + should be called only once in user's python scripts to initialize setings of dataset instance """ batch_size = kwargs.get("batch_size", 1) thread_num = kwargs.get("thread_num", 1) @@ -780,7 +779,7 @@ def __init__(self): def init(self, **kwargs): """ - should be called only once in user's python scripts to initialize seetings of dataset instance + should be called only once in user's python scripts to initialize setings of dataset instance """ super(QueueDataset, self).init(**kwargs) @@ -868,7 +867,7 @@ def __init__(self): def init(self, **kwargs): """ - should be called only once in user's python scripts to initialize seetings of dataset instance + should be called only once in user's python scripts to initialize setings of dataset instance """ super(FileInstantDataset, self).init(**kwargs) @@ -913,7 +912,7 @@ def __init__(self): def init(self, **kwargs): """ - should be called only once in user's python scripts to initialize seetings of dataset instance + should be called only once in user's python scripts to initialize setings of dataset instance """ super(BoxPSDataset, self).init(**kwargs) From 6a5289db56302031e2cf745d38a9fd36e631d097 Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Wed, 9 Sep 2020 23:55:29 +0800 Subject: [PATCH 10/14] remove factory dataset class in distributed.fleet, add InMemory and Queue dataset in distributed --- python/paddle/distributed/__init__.py | 8 +- python/paddle/distributed/fleet/__init__.py | 1 - .../distributed/fleet/dataset/dataset.py | 136 ++++++------------ .../fluid/tests/unittests/dist_fleet_ctr.py | 2 +- .../tests/unittests/dist_fleet_ctr_ps_gpu.py | 2 +- .../tests/unittests/dist_fleet_heter_ctr.py | 2 +- .../fluid/tests/unittests/test_dataset.py | 63 +++----- .../unittests/test_dataset_dataloader.py | 6 +- .../tests/unittests/test_fleet_rolemaker_2.py | 3 +- .../fluid/tests/unittests/test_monitor.py | 3 +- 10 files changed, 78 insertions(+), 148 deletions(-) diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index b7357eef7ad9a..27c8222731630 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -21,6 +21,7 @@ from .parallel import get_world_size from paddle.fluid.dygraph.parallel import prepare_context #DEFINE_ALIAS from paddle.fluid.dygraph.parallel import ParallelEnv #DEFINE_ALIAS +from paddle.distributed.fleet.dataset import * from . import collective from .collective import * @@ -30,11 +31,8 @@ # dygraph parallel apis __all__ += [ - "init_parallel_env", - "get_rank", - "get_world_size", - "prepare_context", - "ParallelEnv", + "init_parallel_env", "get_rank", "get_world_size", "prepare_context", + "ParallelEnv", "InMemoryDataset", "QueueDataset" ] # collective apis diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 5f0cf9f93d62e..2539fa57a34b1 100644 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -23,7 +23,6 @@ __all__ = [ "DistributedStrategy", "UtilBase", - "DatasetFactory", "UserDefinedRoleMaker", "PaddleCloudRoleMaker", "Fleet", diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index a18f5c03bf270..04390e0b2d77f 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -19,48 +19,6 @@ import paddle.fluid.core as core -class DatasetFactory(object): - """ - DatasetFactory is a factory which create dataset by its name, - you can create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset", - the default is "QueueDataset". - - Example: - .. code-block:: python - - import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") - - """ - - def __init__(self): - """ Init. """ - pass - - def create_dataset(self, datafeed_class="QueueDataset"): - """ - Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset", - the default is "QueueDataset". - - Args: - datafeed_class(str): datafeed class name, QueueDataset or InMemoryDataset. - Default is QueueDataset. - - Examples: - .. code-block:: python - - import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() - - """ - try: - dataset = globals()[datafeed_class]() - return dataset - except: - raise ValueError("datafeed class %s does not exist" % - datafeed_class) - - class DatasetBase(object): """ Base dataset class. """ @@ -100,7 +58,7 @@ def _set_pipe_command(self, pipe_command): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.dataset.DatasetBase() dataset._set_pipe_command("python my_script.py") Args: @@ -117,7 +75,7 @@ def _set_batch_size(self, batch_size): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.DatasetBase() dataset._set_batch_size(128) Args: @@ -134,7 +92,7 @@ def _set_thread(self, thread_num): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.DatasetBase() dataset._set_thread(12) Args: @@ -151,7 +109,7 @@ def set_filelist(self, filelist): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.DatasetBase() dataset.set_filelist(['a.txt', 'b.txt']) Args: @@ -171,7 +129,7 @@ def _set_use_var(self, var_list): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.DatasetBase() dataset._set_use_var([data, label]) Args: @@ -202,7 +160,7 @@ def _set_hdfs_config(self, fs_name, fs_ugi): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.DatasetBase() dataset._set_hdfs_config("my_fs_name", "my_fs_ugi") Args: @@ -219,7 +177,7 @@ def _set_download_cmd(self, download_cmd): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.DatasetBase() dataset._set_download_cmd("./read_from_afs") Args: @@ -249,7 +207,7 @@ def _desc(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.DatasetBase() print(dataset._desc()) Returns: @@ -268,11 +226,10 @@ class InMemoryDataset(DatasetBase): """ InMemoryDataset, it will load data into memory and shuffle data before training. - This class should be created by DatasetFactory Example: import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() """ def __init__(self): @@ -390,7 +347,7 @@ def _set_queue_num(self, queue_num): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset._set_queue_num(12) """ @@ -408,7 +365,7 @@ def _set_parse_ins_id(self, parse_ins_id): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset._set_parse_ins_id(True) """ @@ -425,7 +382,7 @@ def _set_parse_content(self, parse_content): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset._set_parse_content(True) """ @@ -442,7 +399,7 @@ def _set_fleet_send_batch_size(self, fleet_send_batch_size=1024): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset._set_fleet_send_batch_size(800) """ @@ -459,7 +416,7 @@ def _set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset._set_fleet_send_sleep_seconds(2) """ @@ -477,7 +434,7 @@ def _set_merge_by_lineid(self, merge_size=2): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset._set_merge_by_lineid() """ @@ -503,7 +460,7 @@ def load_into_memory(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -522,7 +479,7 @@ def preload_into_memory(self, thread_num=None): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -543,7 +500,7 @@ def wait_preload_done(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -560,7 +517,7 @@ def local_shuffle(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -580,7 +537,7 @@ def global_shuffle(self, fleet=None, thread_num=12): import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -624,7 +581,7 @@ def release_memory(self): import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -656,7 +613,7 @@ def get_memory_data_size(self, fleet=None): import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -693,7 +650,7 @@ def get_shuffle_data_size(self, fleet=None): import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -726,7 +683,7 @@ def _set_fea_eval(self, record_candidate_size, fea_eval=True): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset._set_fea_eval(1000000, True) """ @@ -747,7 +704,7 @@ def slots_shuffle(self, slots): Examples: import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.set_merge_by_lineid() #suppose there is a slot 0 dataset.slots_shuffle(['0']) @@ -765,14 +722,13 @@ class QueueDataset(DatasetBase): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("QueueDataset") + dataset = paddle.distributed.QueueDataset() """ def __init__(self): """ Initialize QueueDataset - This class should be created by DatasetFactory """ super(QueueDataset, self).__init__() self.proto_desc.name = "MultiSlotDataFeed" @@ -808,7 +764,7 @@ def local_shuffle(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("QueueDataset") + dataset = paddle.distributed.QueueDataset() dataset.local_shuffle() Raises: @@ -834,7 +790,7 @@ def global_shuffle(self, fleet=None): import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("QueueDataset") + dataset = paddle.distributed.QueueDataset() dataset.global_shuffle(fleet) Raises: @@ -854,13 +810,12 @@ class FileInstantDataset(DatasetBase): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory.create_dataset("FileInstantDataset") + dataset = paddle.distributed.fleet.FileInstantDataset() """ def __init__(self): """ Initialize FileInstantDataset - This class should be created by DatasetFactory """ super(FileInstantDataset, self).__init__() self.proto_desc.name = "MultiSlotFileInstantDataFeed" @@ -898,13 +853,12 @@ class BoxPSDataset(InMemoryDataset): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() """ def __init__(self): """ Initialize BoxPSDataset - This class should be created by DatasetFactory """ super(BoxPSDataset, self).__init__() self.boxps = core.BoxPS(self.dataset) @@ -935,7 +889,7 @@ def _set_rank_offset(self, rank_offset): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.BoxPSDataset() dataset._set_rank_offset("rank_offset") Args: @@ -952,7 +906,7 @@ def _set_pv_batch_size(self, pv_batch_size): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.fleet.BoxPSDataset() dataset._set_pv_batch_size(128) Args: pv_batch_size(int): pv batch size @@ -971,7 +925,7 @@ def _set_parse_logkey(self, parse_logkey): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() dataset._set_parse_logkey(True) """ @@ -988,7 +942,7 @@ def _set_merge_by_sid(self, merge_by_sid): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() dataset._set_merge_by_sid(True) """ @@ -1005,7 +959,7 @@ def _set_enable_pv_merge(self, enable_pv_merge): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() dataset._set_enable_pv_merge(True) """ @@ -1029,7 +983,7 @@ def begin_pass(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() dataset.begin_pass() """ self.boxps.begin_pass() @@ -1042,7 +996,7 @@ def end_pass(self, need_save_delta): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() dataset.end_pass(True) """ self.boxps.end_pass(need_save_delta) @@ -1055,7 +1009,7 @@ def wait_preload_done(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -1070,7 +1024,7 @@ def load_into_memory(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1085,7 +1039,7 @@ def preload_into_memory(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("BoxPSDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -1114,7 +1068,7 @@ def slots_shuffle(self, slots): Examples: import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() dataset.set_merge_by_lineid() #suppose there is a slot 0 dataset.slots_shuffle(['0']) @@ -1131,7 +1085,7 @@ def set_current_phase(self, current_phase): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1155,7 +1109,7 @@ def get_pv_data_size(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1173,7 +1127,7 @@ def preprocess_instance(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1190,7 +1144,7 @@ def postprocess_instance(self): .. code-block:: python import paddle - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index bed424f1532c0..1dda5a0ff87c1 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -208,7 +208,7 @@ def do_dataset_training(self, fleet): filelist = train_file_list # config dataset - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.QueueDataset() pipe_command = 'python ctr_dataset_reader.py' ''' dataset.init( diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py index 4318db31b9a5c..0e3c809927714 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py @@ -114,7 +114,7 @@ def do_dataset_training(self, fleet): filelist.append(train_file_path) # config dataset - dataset = paddle.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.QueueDataset() dataset._set_batch_size(batch_size) dataset._set_use_var(self.feeds) pipe_command = 'python ctr_dataset_reader.py' diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py index 636a6c40660b8..a5633bb0450f9 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py @@ -183,7 +183,7 @@ def do_dataset_training(self, fleet): print("filelist: {}".format(filelist)) # config dataset - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() + dataset = paddle.distributed.QueueDataset() dataset._set_batch_size(batch_size) dataset._set_use_var(self.feeds) pipe_command = 'python ctr_dataset_reader.py' diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 91f3eaa8ac897..ec0e4cc44b4f4 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -38,26 +38,22 @@ def setUp(self): def test_dataset_create(self): """ Testcase for dataset create. """ try: - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() except: self.assertTrue(False) try: - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") + dataset = paddle.distributed.QueueDataset() except: self.assertTrue(False) try: - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "FileInstantDataset") + dataset = paddle.distributed.fleet.dataset.FileInstantDataset() except: self.assertTrue(False) try: - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "MyOwnDataset") + dataset = paddle.distributed.fleet.dataset.MyOwnDataset() self.assertTrue(False) except: self.assertTrue(True) @@ -95,8 +91,7 @@ def test_run_with_dump(self): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=3, @@ -178,8 +173,7 @@ def test_set_download_cmd(self): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=3, @@ -231,8 +225,7 @@ def test_in_memory_dataset_run(self): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=3, @@ -305,8 +298,7 @@ def test_in_memory_dataset_masterpatch(self): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=1, @@ -373,8 +365,7 @@ def test_in_memory_dataset_masterpatch1(self): name="slot4", shape=[1], dtype="float32", lod_level=0) slots_vars = [var1, var2, var3, var4] - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=1, @@ -430,8 +421,7 @@ def test_in_memory_dataset_run_2(self): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist([ @@ -522,8 +512,7 @@ def test_queue_dataset_run(self): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") + dataset = paddle.distributed.QueueDataset() dataset.init( batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist( @@ -546,8 +535,7 @@ def test_queue_dataset_run(self): except Exception as e: self.assertTrue(False) - dataset2 = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") + dataset2 = paddle.distributed.QueueDataset() dataset2.init( batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist([]) @@ -586,8 +574,7 @@ def test_queue_dataset_run_2(self): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") + dataset = paddle.distributed.QueueDataset() dataset.init( batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist( @@ -640,8 +627,7 @@ def test_queue_dataset_run_3(self): name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=1, thread_num=2, @@ -721,8 +707,7 @@ def get_dataset(self, inputs, files): inputs(list): inputs of get_dataset files(list): files of get_dataset """ - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") + dataset = paddle.distributed.QueueDataset() dataset.init( batch_size=32, thread_num=3, pipe_command="cat", use_var=inputs) dataset.set_filelist(files) @@ -877,8 +862,7 @@ def test_dataset_fleet(self): except ImportError as e: print("warning: no mpi4py") exe.run(startup_program) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, @@ -949,8 +933,7 @@ def test_dataset_fleet2(self): except ImportError as e: print("warning: no mpi4py") exe.run(startup_program) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=3, @@ -967,8 +950,7 @@ def test_dataset_fleet2(self): print("warning: catch expected error") fleet._opt_info = None fleet._fleet_ptr = None - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.init(fs_name="", fs_ugi="") d = paddle.distributed.fleet.DatasetBase() try: @@ -1000,8 +982,7 @@ def test_dataset_fleet2(self): #dataset.get_pv_data_size() dataset.get_memory_data_size() dataset.get_shuffle_data_size() - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") + dataset = paddle.distributed.QueueDataset() try: dataset.local_shuffle() except: @@ -1076,8 +1057,7 @@ def test_bosps_dataset_fleet2(self): except ImportError as e: print("warning: no mpi4py") exe.run(startup_program) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "BoxPSDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() dataset.init( batch_size=32, thread_num=3, @@ -1094,8 +1074,7 @@ def test_bosps_dataset_fleet2(self): print("warning: catch expected error") fleet._opt_info = None fleet._fleet_ptr = None - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "BoxPSDataset") + dataset = paddle.distributed.fleet.BoxPSDataset() dataset.init( rank_offset="", pv_batch_size=1, diff --git a/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py b/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py index a4494e88aa9d6..9195ac277b93a 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py +++ b/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py @@ -97,8 +97,10 @@ def build_network(self): def check_batch_number(self, place, randomize_batch_num=False): main_prog, startup_prog, feeds = self.build_network() - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - self.dataset_name) + if self.dataset_name == "QueueDataset": + dataset = paddle.distributed.QueueDataset() + else: + dataset = paddle.distributed.InMemoryDataset() dataset._set_batch_size(BATCH_SIZE) if isinstance(place, fluid.CPUPlace): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py index 722949934eb34..a831f6e838e95 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py @@ -163,8 +163,7 @@ def test_pslib_2(self): data = "1 1 1 1\n" f.write(data) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"]) dataset._set_use_var([show, label]) dataset.load_into_memory() diff --git a/python/paddle/fluid/tests/unittests/test_monitor.py b/python/paddle/fluid/tests/unittests/test_monitor.py index 9dd1092976bf3..cf273876b1f2f 100644 --- a/python/paddle/fluid/tests/unittests/test_monitor.py +++ b/python/paddle/fluid/tests/unittests/test_monitor.py @@ -52,8 +52,7 @@ def test_dataset_run_with_stat(self): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset._set_batch_size(32) dataset._set_thread(3) dataset.set_filelist([ From 98cd361a4c2c4042ce33987dd75151db366556b8 Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Sat, 12 Sep 2020 13:57:14 +0800 Subject: [PATCH 11/14] add distributed_settings and update_settings --- .../distributed/fleet/dataset/dataset.py | 82 ++++++++++++++----- .../fluid/tests/unittests/dist_fleet_ctr.py | 7 +- .../fluid/tests/unittests/test_dataset.py | 47 ++++++----- 3 files changed, 87 insertions(+), 49 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 04390e0b2d77f..0fe150dbfb4dd 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -247,6 +247,66 @@ def __init__(self): self.merge_by_lineid = False self.fleet_send_sleep_seconds = None + def init_distributed_settings(self, **kwargs): + """ + should be called only once in user's python scripts to initialize distributed-related setings of dataset instance + """ + merge_size = kwargs.get("merge_size", -1) + if merge_size > 0: + self._set_merge_by_lineid(merge_size) + + parse_ins_id = kwargs.get("parse_ins_id", False) + self._set_parse_ins_id(parse_ins_id) + + parse_content = kwargs.get("parse_content", False) + self._set_parse_content(parse_content) + + fleet_send_batch_size = kwargs.get("fleet_send_batch_size", None) + if fleet_send_batch_size: + self._set_fleet_send_batch_size(fleet_send_batch_size) + + fleet_send_sleep_seconds = kwargs.get("fleet_send_sleep_seconds", None) + if fleet_send_sleep_seconds: + self._set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) + + fea_eval = kwargs.get("fea_eval", False) + if fea_eval: + candidate_size = kwargs.get("candidate_size", 10000) + self._set_fea_eval(candidate_size, True) + + def update_settings(self, **kwargs): + """ + should be called in user's python scripts to update setings of dataset instance + """ + for key in kwargs: + if key == "pipe_command": + self._set_pipe_command(kwargs[key]) + elif key == "batch_size": + self._set_batch_size(kwargs[key]) + elif key == "thread_num": + self._set_thread(kwargs[key]) + elif key == "use_var": + self._set_use_var(kwargs[key]) + elif key == "input_type": + self._set_input_type(kwargs[key]) + elif key == "fs_name" and "fs_ugi" in kwargs: + self._set_hdfs_config(kwargs[key], kwargs["fs_ugi"]) + elif key == "download_cmd": + self._set_download_cmd(kwargs[key]) + elif key == "merge_size" and kwargs.get("merge_size", -1) > 0: + self._set_merge_by_lineid(kwargs[key]) + elif key == "parse_ins_id": + self._set_parse_ins_id(kwargs[key]) + elif key == "parse_content": + self._set_parse_content(kwargs[key]) + elif key == "fleet_send_batch_size": + self._set_fleet_send_batch_size(kwargs[key]) + elif key == "fleet_send_sleep_seconds": + self._set_fleet_send_sleep_seconds(kwargs[key]) + elif key == "fea_eval" and kwargs[key] == True: + candidate_size = kwargs.get("candidate_size", 10000) + self._set_fea_eval(candidate_size, True) + def init(self, **kwargs): """ should be called only once in user's python scripts to initialize setings of dataset instance @@ -277,28 +337,6 @@ def init(self, **kwargs): if kwargs.get("queue_num", -1) > 0: queue_num = kwargs.get("queue_num", -1) self._set_queue_num(queue_num) - merge_size = kwargs.get("merge_size", -1) - if merge_size > 0: - self._set_merge_by_lineid(merge_size) - - parse_ins_id = kwargs.get("parse_ins_id", False) - self._set_parse_ins_id(parse_ins_id) - - parse_content = kwargs.get("parse_content", False) - self._set_parse_content(parse_content) - - fleet_send_batch_size = kwargs.get("fleet_send_batch_size", None) - if fleet_send_batch_size: - self._set_fleet_send_batch_size(fleet_send_batch_size) - - fleet_send_sleep_seconds = kwargs.get("fleet_send_sleep_seconds", None) - if fleet_send_sleep_seconds: - self._set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) - - fea_eval = kwargs.get("fea_eval", False) - if fea_eval: - candidate_size = kwargs.get("candidate_size", 10000) - self._set_fea_eval(candidate_size, True) def _set_feed_type(self, data_feed_type): """ diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 1dda5a0ff87c1..1b0ce0c03e7c6 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -210,19 +210,14 @@ def do_dataset_training(self, fleet): # config dataset dataset = paddle.distributed.QueueDataset() pipe_command = 'python ctr_dataset_reader.py' - ''' + dataset.init( batch_size=batch_size, use_var=self.feeds, pipe_command=pipe_command, thread_num=thread_num) - ''' - dataset._set_batch_size(batch_size) - dataset._set_use_var(self.feeds) - dataset._set_pipe_command(pipe_command) dataset.set_filelist(filelist) - dataset._set_thread(thread_num) for epoch_id in range(1): pass_start = time.time() diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index ec0e4cc44b4f4..72c7f23748caa 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -93,12 +93,11 @@ def test_run_with_dump(self): dataset = paddle.distributed.InMemoryDataset() dataset.init( - batch_size=32, - thread_num=3, + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) + dataset.update_settings(pipe_command="cat1") + dataset.init_distributed_settings( parse_ins_id=True, parse_content=True, - pipe_command="cat", - use_var=slots_vars, fea_eval=True, candidate_size=10000) dataset.set_filelist( @@ -227,12 +226,8 @@ def test_in_memory_dataset_run(self): dataset = paddle.distributed.InMemoryDataset() dataset.init( - batch_size=32, - thread_num=3, - pipe_command="cat", - use_var=slots_vars, - fea_eval=True, - candidate_size=1) + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) + dataset.init_distributed_settings(fea_eval=True, candidate_size=1) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" @@ -300,11 +295,8 @@ def test_in_memory_dataset_masterpatch(self): dataset = paddle.distributed.InMemoryDataset() dataset.init( - batch_size=32, - thread_num=1, - parse_ins_id=True, - pipe_command="cat", - use_var=slots_vars) + batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) + dataset.init_distributed_settings(parse_ins_id=True) dataset.set_filelist([ "test_in_memory_dataset_masterpatch_a.txt", "test_in_memory_dataset_masterpatch_b.txt" @@ -323,7 +315,8 @@ def test_in_memory_dataset_masterpatch(self): except Exception as e: self.assertTrue(False) - dataset._set_merge_by_lineid(2) + #dataset._set_merge_by_lineid(2) + datset.update_settings(merge_size=2) dataset.dataset.merge_by_lineid() os.remove("./test_in_memory_dataset_masterpatch_a.txt") @@ -367,11 +360,8 @@ def test_in_memory_dataset_masterpatch1(self): dataset = paddle.distributed.InMemoryDataset() dataset.init( - batch_size=32, - thread_num=1, - parse_ins_id=True, - pipe_command="cat", - use_var=slots_vars) + batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) + dataset.init_distributed_settings(parse_ins_id=True) dataset.set_filelist([ "test_in_memory_dataset_masterpatch1_a.txt", "test_in_memory_dataset_masterpatch1_b.txt" @@ -482,6 +472,21 @@ def test_in_memory_dataset_run_2(self): dataset._set_parse_ins_id(False) dataset.load_into_memory() dataset.dataset.merge_by_lineid() + dataset.update_settings( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=[], + fs_name="", + fs_ugi="", + download_cmd="cat", + merge_size=-1, + parse_ins_id=False, + parse_content=False, + fleet_send_batch_size=2, + fleet_send_sleep_seconds=2, + fea_eval=True) fleet_ptr = fluid.core.Fleet() fleet_ptr.set_client2client_config(1, 1, 1) fleet_ptr.get_cache_threshold(0) From 4d72af38dd846b9b691f894dd21d204d95ac599e Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Mon, 14 Sep 2020 12:12:46 +0800 Subject: [PATCH 12/14] fix ut fail --- .../distributed/fleet/dataset/dataset.py | 199 ++++++++++++------ .../fluid/tests/unittests/test_dataset.py | 2 +- 2 files changed, 133 insertions(+), 68 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 0fe150dbfb4dd..8fa2114799281 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -250,6 +250,37 @@ def __init__(self): def init_distributed_settings(self, **kwargs): """ should be called only once in user's python scripts to initialize distributed-related setings of dataset instance + Args: + kwargs: Keyword arguments. Currently, we support following keys in **kwargs: + + merge_size(int): ins size to merge, if merge_size > 0, set merge by line id, + instances of same line id will be merged after shuffle, + you should parse line id in data generator. default is -1. + parse_ins_id(bool): Set if Dataset need to parse ins_id. default is False. + parse_content(bool): Set if Dataset need to parse content. default is False. + fleet_send_batch_size(int): Set fleet send batch size in one rpc, default is 1024 + fleet_send_sleep_seconds(int): Set fleet send sleep time, default is 0 + fea_eval(bool): Set if Dataset need to do feature importance evaluation using slots shuffle. + default is False. + candidate_size(int): if fea_eval is set True, set the candidate size used in slots shuffle. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=[]) + dataset.init_distributed_settings( + parse_ins_id=True, + parse_content=True, + fea_eval=True, + candidate_size=10000) + """ merge_size = kwargs.get("merge_size", -1) if merge_size > 0: @@ -277,6 +308,50 @@ def init_distributed_settings(self, **kwargs): def update_settings(self, **kwargs): """ should be called in user's python scripts to update setings of dataset instance + Args: + kwargs: Keyword arguments. Currently, we support following keys in **kwargs, + including single node settings and advanced distributed related settings: + + batch_size(int): batch size. It will be effective during training. default: 1. + thread_num(int): thread num, it is the num of readers. default: 1. + use_var(list): list of variables. Variables which you will use. default: []. + input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut: 0. + fs_name(str): fs name. default: "". + fs_ugi(str): fs ugi. default: "". + pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default: "cat" + download_cmd(str): customized download command. default: "cat" + data_feed_type(str): data feed type used in c++ code. default: "MultiSlotInMemoryDataFeed". + queue_num(int): Dataset output queue num, training threads get data from queues. default:-1, which is set same as thread number in c++. + + merge_size(int): ins size to merge, if merge_size > 0, set merge by line id, + instances of same line id will be merged after shuffle, + you should parse line id in data generator. default is -1. + parse_ins_id(bool): Set if Dataset need to parse ins_id. default is False. + parse_content(bool): Set if Dataset need to parse content. default is False. + fleet_send_batch_size(int): Set fleet send batch size in one rpc, default is 1024 + fleet_send_sleep_seconds(int): Set fleet send sleep time, default is 0 + fea_eval(bool): Set if Dataset need to do feature importance evaluation using slots shuffle. + default is False. + candidate_size(int): if fea_eval is set True, set the candidate size used in slots shuffle. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=[]) + dataset.init_distributed_settings( + parse_ins_id=True, + parse_content=True, + fea_eval=True, + candidate_size=10000) + dataset.update_settings(batch_size=2) + """ for key in kwargs: if key == "pipe_command": @@ -310,6 +385,62 @@ def update_settings(self, **kwargs): def init(self, **kwargs): """ should be called only once in user's python scripts to initialize setings of dataset instance + Args: + kwargs: Keyword arguments. Currently, we support following keys in **kwargs: + + batch_size(int): batch size. It will be effective during training. default: 1. + thread_num(int): thread num, it is the num of readers. default: 1. + use_var(list): list of variables. Variables which you will use. default: []. + input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut: 0. + fs_name(str): fs name. default: "". + fs_ugi(str): fs ugi. default: "". + pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default: "cat" + download_cmd(str): customized download command. default: "cat" + data_feed_type(str): data feed type used in c++ code. default: "MultiSlotInMemoryDataFeed". + queue_num(int): Dataset output queue num, training threads get data from queues. default:-1, which is set same as thread number in c++. + + Examples: + .. code-block:: python + + import paddle + with open("test_queue_dataset_run_a.txt", "w") as f: + data = "2 1 2 2 5 4 2 2 7 2 1 3\n" + data += "2 6 2 2 1 4 2 2 4 2 2 3\n" + data += "2 5 2 2 9 9 2 2 7 2 1 3\n" + data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + f.write(data) + with open("test_queue_dataset_run_b.txt", "w") as f: + data = "2 1 2 2 5 4 2 2 7 2 1 3\n" + data += "2 6 2 2 1 4 2 2 4 2 2 3\n" + data += "2 5 2 2 9 9 2 2 7 2 1 3\n" + data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + f.write(data) + + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = fluid.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + dataset.set_filelist( + ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) + dataset.load_into_memory() + + exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( + ) else fluid.CUDAPlace(0)) + exe.run(fluid.default_startup_program()) + exe.train_from_dataset(fluid.default_main_program(), + dataset) + os.remove("./test_queue_dataset_run_a.txt") + os.remove("./test_queue_dataset_run_b.txt") """ batch_size = kwargs.get("batch_size", 1) thread_num = kwargs.get("thread_num", 1) @@ -394,7 +525,7 @@ def _set_queue_num(self, queue_num): def _set_parse_ins_id(self, parse_ins_id): """ - Set id Dataset need to parse insid + Set if Dataset need to parse insid Args: parse_ins_id(bool): if parse ins_id or not @@ -791,54 +922,6 @@ def _prepare_to_run(self): self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_readers() - def local_shuffle(self): - """ - Local shuffle data. - - Local shuffle is not supported in QueueDataset - NotImplementedError will be raised - - Examples: - .. code-block:: python - - import paddle - dataset = paddle.distributed.QueueDataset() - dataset.local_shuffle() - - Raises: - NotImplementedError: QueueDataset does not support local shuffle - - """ - raise NotImplementedError( - "QueueDataset does not support local shuffle, " - "please use InMemoryDataset for local_shuffle") - - def global_shuffle(self, fleet=None): - """ - Global shuffle data. - - Global shuffle is not supported in QueueDataset - NotImplementedError will be raised - - Args: - fleet(Fleet): fleet singleton. Default None. - - Examples: - .. code-block:: python - - import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.QueueDataset() - dataset.global_shuffle(fleet) - - Raises: - NotImplementedError: QueueDataset does not support global shuffle - - """ - raise NotImplementedError( - "QueueDataset does not support global shuffle, " - "please use InMemoryDataset for global_shuffle") - class FileInstantDataset(DatasetBase): """ @@ -864,24 +947,6 @@ def init(self, **kwargs): """ super(FileInstantDataset, self).init(**kwargs) - def local_shuffle(self): - """ - Local shuffle - FileInstantDataset does not support local shuffle - """ - raise NotImplementedError( - "FileInstantDataset does not support local shuffle, " - "please use InMemoryDataset for local_shuffle") - - def global_shuffle(self, fleet=None): - """ - Global shuffle - FileInstantDataset does not support global shuffle - """ - raise NotImplementedError( - "FileInstantDataset does not support global shuffle, " - "please use InMemoryDataset for global_shuffle") - class BoxPSDataset(InMemoryDataset): """ diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 72c7f23748caa..0eaff5147a2f0 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -316,7 +316,7 @@ def test_in_memory_dataset_masterpatch(self): self.assertTrue(False) #dataset._set_merge_by_lineid(2) - datset.update_settings(merge_size=2) + dataset.update_settings(merge_size=2) dataset.dataset.merge_by_lineid() os.remove("./test_in_memory_dataset_masterpatch_a.txt") From c44da20c0cac2c62110d5c9fc47f90c67ab905bc Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Tue, 15 Sep 2020 12:10:17 +0800 Subject: [PATCH 13/14] fix ci fail --- .../distributed/fleet/dataset/dataset.py | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 8fa2114799281..fecada7ccb2bf 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -41,6 +41,22 @@ def init(self, fs_name="", fs_ugi="", download_cmd="cat"): + """ + should be called only once in user's python scripts to initialize setings of dataset instance. + Normally, it is called by InMemoryDataset or QueueDataset. + + Args: + batch_size(int): batch size. It will be effective during training. default is 1. + thread_num(int): thread num, it is the num of readers. default is 1. + use_var(list): list of variables. Variables which you will use. default is []. + pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat" + input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0. + fs_name(str): fs name. default is "". + fs_ugi(str): fs ugi. default is "". + download_cmd(str): customized download command. default is "cat" + + + """ self._set_batch_size(batch_size) self._set_thread(thread_num) self._set_use_var(use_var) @@ -312,16 +328,16 @@ def update_settings(self, **kwargs): kwargs: Keyword arguments. Currently, we support following keys in **kwargs, including single node settings and advanced distributed related settings: - batch_size(int): batch size. It will be effective during training. default: 1. - thread_num(int): thread num, it is the num of readers. default: 1. - use_var(list): list of variables. Variables which you will use. default: []. - input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut: 0. - fs_name(str): fs name. default: "". - fs_ugi(str): fs ugi. default: "". - pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default: "cat" - download_cmd(str): customized download command. default: "cat" - data_feed_type(str): data feed type used in c++ code. default: "MultiSlotInMemoryDataFeed". - queue_num(int): Dataset output queue num, training threads get data from queues. default:-1, which is set same as thread number in c++. + batch_size(int): batch size. It will be effective during training. default is 1. + thread_num(int): thread num, it is the num of readers. default is 1. + use_var(list): list of variables. Variables which you will use. default is []. + input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0. + fs_name(str): fs name. default is "". + fs_ugi(str): fs ugi. default is "". + pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat" + download_cmd(str): customized download command. default is "cat" + data_feed_type(str): data feed type used in c++ code. default is "MultiSlotInMemoryDataFeed". + queue_num(int): Dataset output queue num, training threads get data from queues. default is-1, which is set same as thread number in c++. merge_size(int): ins size to merge, if merge_size > 0, set merge by line id, instances of same line id will be merged after shuffle, @@ -388,16 +404,16 @@ def init(self, **kwargs): Args: kwargs: Keyword arguments. Currently, we support following keys in **kwargs: - batch_size(int): batch size. It will be effective during training. default: 1. - thread_num(int): thread num, it is the num of readers. default: 1. - use_var(list): list of variables. Variables which you will use. default: []. - input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut: 0. - fs_name(str): fs name. default: "". - fs_ugi(str): fs ugi. default: "". - pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default: "cat" - download_cmd(str): customized download command. default: "cat" - data_feed_type(str): data feed type used in c++ code. default: "MultiSlotInMemoryDataFeed". - queue_num(int): Dataset output queue num, training threads get data from queues. default:-1, which is set same as thread number in c++. + batch_size(int): batch size. It will be effective during training. default is 1. + thread_num(int): thread num, it is the num of readers. default is 1. + use_var(list): list of variables. Variables which you will use. default is []. + input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0. + fs_name(str): fs name. default is "". + fs_ugi(str): fs ugi. default is "". + pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat" + download_cmd(str): customized download command. default is "cat" + data_feed_type(str): data feed type used in c++ code. default is "MultiSlotInMemoryDataFeed". + queue_num(int): Dataset output queue num, training threads get data from queues. default is -1, which is set same as thread number in c++. Examples: .. code-block:: python From 1f822a0921db5c050c630cc6166a3239d3623ddd Mon Sep 17 00:00:00 2001 From: yaoxuefeng6 Date: Tue, 15 Sep 2020 16:39:10 +0800 Subject: [PATCH 14/14] hide init_distributed_settings --- python/paddle/distributed/fleet/dataset/dataset.py | 6 +++--- python/paddle/fluid/tests/unittests/test_dataset.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index fecada7ccb2bf..5bd971181ed34 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -263,7 +263,7 @@ def __init__(self): self.merge_by_lineid = False self.fleet_send_sleep_seconds = None - def init_distributed_settings(self, **kwargs): + def _init_distributed_settings(self, **kwargs): """ should be called only once in user's python scripts to initialize distributed-related setings of dataset instance Args: @@ -291,7 +291,7 @@ def init_distributed_settings(self, **kwargs): input_type=1, pipe_command="cat", use_var=[]) - dataset.init_distributed_settings( + dataset._init_distributed_settings( parse_ins_id=True, parse_content=True, fea_eval=True, @@ -361,7 +361,7 @@ def update_settings(self, **kwargs): input_type=1, pipe_command="cat", use_var=[]) - dataset.init_distributed_settings( + dataset._init_distributed_settings( parse_ins_id=True, parse_content=True, fea_eval=True, diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 0eaff5147a2f0..208956b825ed1 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -95,7 +95,7 @@ def test_run_with_dump(self): dataset.init( batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.update_settings(pipe_command="cat1") - dataset.init_distributed_settings( + dataset._init_distributed_settings( parse_ins_id=True, parse_content=True, fea_eval=True, @@ -227,7 +227,7 @@ def test_in_memory_dataset_run(self): dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) - dataset.init_distributed_settings(fea_eval=True, candidate_size=1) + dataset._init_distributed_settings(fea_eval=True, candidate_size=1) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" @@ -296,7 +296,7 @@ def test_in_memory_dataset_masterpatch(self): dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) - dataset.init_distributed_settings(parse_ins_id=True) + dataset._init_distributed_settings(parse_ins_id=True) dataset.set_filelist([ "test_in_memory_dataset_masterpatch_a.txt", "test_in_memory_dataset_masterpatch_b.txt" @@ -361,7 +361,7 @@ def test_in_memory_dataset_masterpatch1(self): dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) - dataset.init_distributed_settings(parse_ins_id=True) + dataset._init_distributed_settings(parse_ins_id=True) dataset.set_filelist([ "test_in_memory_dataset_masterpatch1_a.txt", "test_in_memory_dataset_masterpatch1_b.txt"