From 374581d329bdab2432c34324410953c0c5f33068 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Tue, 28 Nov 2023 19:39:05 +0800 Subject: [PATCH 1/8] * after stats computed, save the stats first then do analysis --- data_juicer/core/analyser.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/data_juicer/core/analyser.py b/data_juicer/core/analyser.py index c1b0b93af..d08f972ad 100644 --- a/data_juicer/core/analyser.py +++ b/data_juicer/core/analyser.py @@ -102,11 +102,18 @@ def run(self, load_data_np=None): 'the process list in configs.') return dataset - # 3. analysis and output result to the export path - # 3.1. Only consider fields in Fields.stats - # 3.2. For string fields, only consider its histogram - # 3.3. For numeric fields, consider its histogram and box - # 3.4. Otherwise, DO NOT analyse + # 3. data export + logger.info('Exporting dataset to disk...') + self.exporter.export(dataset) + if self.cfg.use_cache and self.cfg.cache_compress: + from data_juicer.utils.compress import compress + compress(dataset) + + # 4. analysis and output result to the export path + # 4.1. Only consider fields in Fields.stats + # 4.2. For string fields, only consider its histogram + # 4.3. For numeric fields, consider its histogram and box + # 4.4. Otherwise, DO NOT analyse logger.info('Applying overall analysis on stats...') overall_analysis = OverallAnalysis(dataset, self.analysis_path) @@ -120,10 +127,4 @@ def run(self, load_data_np=None): save_stats_in_one_file=self.cfg.save_stats_in_one_file) column_wise_analysis.analyse() - # 4. data export - logger.info('Exporting dataset to disk...') - self.exporter.export(dataset) - if self.cfg.use_cache and self.cfg.cache_compress: - from data_juicer.utils.compress import compress - compress(dataset) return dataset From e704da272dbfb3bf61e43549ff561ad4a371a355 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Tue, 28 Nov 2023 20:00:25 +0800 Subject: [PATCH 2/8] * use map to add new column instead of add_column method to speed up --- data_juicer/core/analyser.py | 13 ++++++++----- data_juicer/core/data.py | 14 ++++++++++++++ data_juicer/core/executor.py | 14 +++++++++----- data_juicer/format/formatter.py | 15 +++++++++++---- data_juicer/format/text_formatter.py | 2 +- .../dataset_splitting_by_language.py | 13 +++++++++---- tools/preprocess/dataset_split_by_language.py | 12 ++++++++---- 7 files changed, 60 insertions(+), 23 deletions(-) diff --git a/data_juicer/core/analyser.py b/data_juicer/core/analyser.py index d08f972ad..84083e3f4 100644 --- a/data_juicer/core/analyser.py +++ b/data_juicer/core/analyser.py @@ -9,6 +9,7 @@ from data_juicer.utils import cache_utils from data_juicer.utils.constant import Fields +from .data import add_same_content_to_new_column from .exporter import Exporter @@ -87,12 +88,14 @@ def run(self, load_data_np=None): op_name = list(op_cfg.keys())[0] if isinstance(op, Filter): if Fields.stats not in dataset.features: - # TODO: - # this is a temp solution, # only add stats when calling filter op - dataset = dataset.add_column(name=Fields.stats, - column=[{}] * - dataset.num_rows) + dataset = dataset.map(add_same_content_to_new_column, + fn_kwargs={ + 'new_column_name': Fields.stats, + 'initial_value': {} + }, + num_proc=self.cfg.np, + desc='Adding new column for stats') dataset = dataset.map(op.compute_stats, num_proc=self.cfg.np, desc=op_name + '_compute_stats') diff --git a/data_juicer/core/data.py b/data_juicer/core/data.py index 6e43a4524..326730c6b 100644 --- a/data_juicer/core/data.py +++ b/data_juicer/core/data.py @@ -317,3 +317,17 @@ def nested_query(root_obj: Union[NestedDatasetDict, NestedDataset, return None return None + + +def add_same_content_to_new_column(sample, + new_column_name, + initial_value=None): + """ + A helper function to speed up add_column function. Apply map on this + function in parallel instead of using add_column. + :param sample: a single sample to add this new column/field. + :param new_column_name: the name of this new column/field. + :param initial_value: the initial value of this new column/field. + """ + sample[new_column_name] = initial_value + return sample diff --git a/data_juicer/core/executor.py b/data_juicer/core/executor.py index 8fb0c8b2b..b724f7106 100644 --- a/data_juicer/core/executor.py +++ b/data_juicer/core/executor.py @@ -11,6 +11,7 @@ from data_juicer.utils.ckpt_utils import CheckpointManager from data_juicer.utils.constant import Fields +from .data import add_same_content_to_new_column from .exporter import Exporter from .tracer import Tracer @@ -125,12 +126,15 @@ def run(self, load_data_np=None): op.text_key) elif isinstance(op, Filter): if Fields.stats not in dataset.features: - # TODO: - # this is a temp solution, # only add stats when calling filter op - dataset = dataset.add_column(name=Fields.stats, - column=[{}] * - dataset.num_rows) + dataset = dataset.map( + add_same_content_to_new_column, + fn_kwargs={ + 'new_column_name': Fields.stats, + 'initial_value': {} + }, + num_proc=self.cfg.np, + desc='Adding new column for stats') if self.cfg.use_checkpoint: prev = dataset dataset = dataset.map(op.compute_stats, diff --git a/data_juicer/format/formatter.py b/data_juicer/format/formatter.py index c47ad2414..384816fd3 100644 --- a/data_juicer/format/formatter.py +++ b/data_juicer/format/formatter.py @@ -69,7 +69,7 @@ def load_dataset(self, num_proc: int = 1, global_cfg=None) -> Dataset: **self.kwargs) if self.add_suffix: logger.info('Add suffix info into dataset...') - datasets = add_suffixes(datasets) + datasets = add_suffixes(datasets, num_proc) else: from data_juicer.core.data import NestedDataset datasets = NestedDataset( @@ -120,18 +120,25 @@ def load_dataset(self, num_proc: int = 1, global_cfg=None) -> Dataset: return ds -def add_suffixes(datasets: DatasetDict) -> Dataset: +def add_suffixes(datasets: DatasetDict, num_proc: int = 1) -> Dataset: """ Add suffix filed to datasets. :param datasets: a DatasetDict object + :param num_proc: number of processes to add suffixes :return: datasets with suffix features. """ logger.info('Add suffix column for dataset') + from data_juicer.core.data import add_same_content_to_new_column for key, ds in datasets.items(): if Fields.suffix not in ds.features: - datasets[key] = ds.add_column(name=Fields.suffix, - column=['.' + key] * ds.num_rows) + datasets[key] = ds.map(add_same_content_to_new_column, + fn_kwargs={ + 'new_column_name': Fields.suffix, + 'initial_value': '.' + key + }, + num_proc=num_proc, + desc='Adding new column for suffix') datasets = concatenate_datasets([ds for _, ds in datasets.items()]) from data_juicer.core.data import NestedDataset return NestedDataset(datasets) diff --git a/data_juicer/format/text_formatter.py b/data_juicer/format/text_formatter.py index 0f34930c7..4e3cf5cb1 100644 --- a/data_juicer/format/text_formatter.py +++ b/data_juicer/format/text_formatter.py @@ -150,7 +150,7 @@ def load_dataset(self, num_proc: int = 1, global_cfg=None) -> Dataset: # whether to add file suffix to datase meta info if self.add_suffix: logger.info('Add suffix info into dataset...') - datasets = add_suffixes(datasets) + datasets = add_suffixes(datasets, num_proc) else: datasets = concatenate_datasets([ds for _, ds in datasets.items()]) return unify_format(datasets, diff --git a/demos/tool_dataset_splitting_by_language/dataset_splitting_by_language.py b/demos/tool_dataset_splitting_by_language/dataset_splitting_by_language.py index 736a3dbfb..e3fe959ce 100644 --- a/demos/tool_dataset_splitting_by_language/dataset_splitting_by_language.py +++ b/demos/tool_dataset_splitting_by_language/dataset_splitting_by_language.py @@ -7,6 +7,7 @@ import pandas as pd from loguru import logger +from data_juicer.core.data import add_same_content_to_new_column from data_juicer.format import load_formatter from data_juicer.ops.filter.language_id_score_filter import \ LanguageIDScoreFilter @@ -51,11 +52,15 @@ def main(src_dir, target_dir, text_key=None, suffixes=[], num_proc=1): op = LanguageIDScoreFilter(text_key=text_key) if Fields.stats not in dataset.features: - # TODO: - # this is a temp solution, # only add stats when calling filter op - dataset = dataset.add_column(name=Fields.stats, - column=[{}] * dataset.num_rows) + dataset = dataset.map( + add_same_content_to_new_column, + fn_kwargs={ + 'new_column_name': Fields.stats, + 'initial_value': {} + }, + num_proc=num_proc, + desc='Adding new column for stats') # identify language dataset = dataset.map(op.compute_stats, num_proc=num_proc) diff --git a/tools/preprocess/dataset_split_by_language.py b/tools/preprocess/dataset_split_by_language.py index f2b28a90c..f3c411404 100644 --- a/tools/preprocess/dataset_split_by_language.py +++ b/tools/preprocess/dataset_split_by_language.py @@ -7,6 +7,7 @@ import pandas as pd from loguru import logger +from data_juicer.core.data import add_same_content_to_new_column from data_juicer.format import load_formatter from data_juicer.ops.filter.language_id_score_filter import \ LanguageIDScoreFilter @@ -52,11 +53,14 @@ def main(src_dir, target_dir, text_key=None, suffixes=[], num_proc=1): op = LanguageIDScoreFilter(text_key=text_key) if Fields.stats not in dataset.features: - # TODO: - # this is a temp solution, # only add stats when calling filter op - dataset = dataset.add_column(name=Fields.stats, - column=[{}] * dataset.num_rows) + dataset = dataset.map(add_same_content_to_new_column, + fn_kwargs={ + 'new_column_name': Fields.stats, + 'initial_value': {} + }, + num_proc=num_proc, + desc='Adding new column for stats') # identify language dataset = dataset.map(op.compute_stats, num_proc=num_proc) From 9c31b842479d3550d259314317684bf32d99b7f3 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Tue, 28 Nov 2023 21:15:22 +0800 Subject: [PATCH 3/8] * use multiprocessing to speed up overall analysis + support analyse for DataFrame column of list type --- data_juicer/analysis/overall_analysis.py | 54 +++++++++++++++++++++++- data_juicer/core/analyser.py | 2 +- 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/data_juicer/analysis/overall_analysis.py b/data_juicer/analysis/overall_analysis.py index 117d43d68..acf8539ff 100644 --- a/data_juicer/analysis/overall_analysis.py +++ b/data_juicer/analysis/overall_analysis.py @@ -1,10 +1,18 @@ import os +from multiprocessing import Pool import pandas as pd +from loguru import logger +from tqdm import tqdm from data_juicer.utils.constant import Fields +def _single_column_analysis(col, *args, **kwargs): + col_overall = col.describe(*args, **kwargs) + return col_overall + + class OverallAnalysis: """Apply analysis on the overall stats, including mean, std, quantiles, etc.""" @@ -23,18 +31,60 @@ def __init__(self, dataset, output_path): # default percentiles to analyse self.default_percentiles = [0.25, 0.5, 0.75] + # supported dtypes of column to be analysed + # Notice: there won't be mixed types in a column because the stats is + # obtained from Dataset, which doesn't allow mixed types. + # Notice: for now, stats can only be: + # {numbers, string, list of one of before} + self.supported_object_types = {str, list} - def analyse(self, percentiles=[]): + def refine_single_column(self, col): + if col.dtype != 'object': + # not an object, return directly + return col + # if the type of this column is object, we can decide the actual type + # according to the first element. + first = col[0] + if type(first) not in self.supported_object_types: + logger.warning(f'There is a column of stats with type ' + f'[{type(first)}], which is not supported to be ' + f'analysed for now.') + return None + if type(first) is str: + # describe(include = 'all') can analyze the string type + return col + elif type(first) is list: + # flatten and infer the type + col = col.explode().infer_objects() + return col + + def analyse(self, percentiles=[], num_proc=1): """ Apply overall analysis on the whole dataset based on the describe method of pandas. :param percentiles: percentiles to analyse + :param num_proc: number of processes to analyse the dataset :return: the overall analysis result. """ # merge default and customized percentiles and get overall information percentiles = list(set(percentiles + self.default_percentiles)) - overall = self.stats.describe(percentiles=percentiles, include='all') + + results = [] + pool = Pool(num_proc) + for col_name in self.stats.columns: + this_col = self.refine_single_column(self.stats[col_name]) + res = pool.apply_async(_single_column_analysis, + kwds={ + 'col': this_col, + 'percentiles': percentiles, + 'include': 'all', + }) + results.append(res) + pool.close() + pool.join() + result_cols = [res.get() for res in tqdm(results)] + overall = pd.DataFrame(result_cols).T # export to result report file overall.to_csv(os.path.join(self.output_path, 'overall.csv')) diff --git a/data_juicer/core/analyser.py b/data_juicer/core/analyser.py index 84083e3f4..8636a2a53 100644 --- a/data_juicer/core/analyser.py +++ b/data_juicer/core/analyser.py @@ -120,7 +120,7 @@ def run(self, load_data_np=None): logger.info('Applying overall analysis on stats...') overall_analysis = OverallAnalysis(dataset, self.analysis_path) - self.overall_result = overall_analysis.analyse() + self.overall_result = overall_analysis.analyse(num_proc=self.cfg.np) logger.info('Applying column-wise analysis on stats...') column_wise_analysis = ColumnWiseAnalysis( From eb02bfea8ae0bd4152cc0bef758ca1e0d0cf603f Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Wed, 29 Nov 2023 11:30:51 +0800 Subject: [PATCH 4/8] + support column-wise analysis for DataFrame column of list type --- data_juicer/analysis/column_wise_analysis.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/data_juicer/analysis/column_wise_analysis.py b/data_juicer/analysis/column_wise_analysis.py index 92484d598..f9c0e42b0 100644 --- a/data_juicer/analysis/column_wise_analysis.py +++ b/data_juicer/analysis/column_wise_analysis.py @@ -3,6 +3,7 @@ import matplotlib.pyplot as plt import pandas as pd +from tqdm import tqdm from data_juicer.utils.constant import Fields @@ -111,8 +112,10 @@ def analyse(self, show_percentiles=False, show=False): fig = plt.figure(figsize=(rec_width, rec_height), layout='constrained') subfigs = fig.subfigures(rec_row, rec_col, wspace=0.01) - for i, column_name in enumerate(columns): + for i, column_name in tqdm(enumerate(columns), desc='Column'): data = self.stats[column_name] + # explode data to flatten inner list + data = data.explode().infer_objects() grid = grid_indexes[i] if self.save_stats_in_one_file: if rec_col == 1: @@ -128,7 +131,8 @@ def analyse(self, show_percentiles=False, show=False): # numeric or string via nan. Apply different plot method for them. if pd.isna(self.overall_result[column_name].get('top')): - # numeric -- draw histogram and box plot for this stat + # numeric or numeric list -- draw histogram and box plot for + # this stat percentiles = self.overall_result[column_name] \ if show_percentiles else None @@ -152,7 +156,8 @@ def analyse(self, show_percentiles=False, show=False): f'{column_name}-box.png'), percentiles=percentiles) else: - # object (string) -- only draw histogram for this stat + # object (string) or string list -- only draw histogram for + # this stat if self.save_stats_in_one_file: axes = subfig.subplots(1, 1) else: From dd465e896c9952576d6cfbcaf7597e2e0a433195 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Wed, 29 Nov 2023 11:34:59 +0800 Subject: [PATCH 5/8] + support column-wise analysis for DataFrame column of list type --- data_juicer/analysis/column_wise_analysis.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data_juicer/analysis/column_wise_analysis.py b/data_juicer/analysis/column_wise_analysis.py index f9c0e42b0..fe8b38817 100644 --- a/data_juicer/analysis/column_wise_analysis.py +++ b/data_juicer/analysis/column_wise_analysis.py @@ -112,7 +112,8 @@ def analyse(self, show_percentiles=False, show=False): fig = plt.figure(figsize=(rec_width, rec_height), layout='constrained') subfigs = fig.subfigures(rec_row, rec_col, wspace=0.01) - for i, column_name in tqdm(enumerate(columns), desc='Column'): + for i, column_name in tqdm(enumerate(columns.to_list()), + desc='Column'): data = self.stats[column_name] # explode data to flatten inner list data = data.explode().infer_objects() From 886d9ea14e398d962c6443f269b66481eec50b7f Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Wed, 29 Nov 2023 11:39:03 +0800 Subject: [PATCH 6/8] + support column-wise analysis for DataFrame column of list type --- data_juicer/analysis/column_wise_analysis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data_juicer/analysis/column_wise_analysis.py b/data_juicer/analysis/column_wise_analysis.py index fe8b38817..5fe732cbc 100644 --- a/data_juicer/analysis/column_wise_analysis.py +++ b/data_juicer/analysis/column_wise_analysis.py @@ -112,8 +112,8 @@ def analyse(self, show_percentiles=False, show=False): fig = plt.figure(figsize=(rec_width, rec_height), layout='constrained') subfigs = fig.subfigures(rec_row, rec_col, wspace=0.01) - for i, column_name in tqdm(enumerate(columns.to_list()), - desc='Column'): + for i, column_name in enumerate(tqdm(columns.to_list(), + desc='Column')): data = self.stats[column_name] # explode data to flatten inner list data = data.explode().infer_objects() From c5465dee77fdb115f0d98289050300d9d4cd780d Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Wed, 29 Nov 2023 14:28:55 +0800 Subject: [PATCH 7/8] + add two arguments to control whether to keep stats or hashes in the result dataset --- data_juicer/config/config.py | 14 ++++++++++++ data_juicer/core/executor.py | 10 ++++++--- data_juicer/core/exporter.py | 42 +++++++++++++++++++++++++++--------- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index c6eef4329..a3611c56f 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -89,6 +89,20 @@ def init_configs(args=None): 'due to the IO blocking, especially for very large datasets. ' 'When this happens, False is a better choice, although it takes ' 'more time.') + parser.add_argument( + '--keep_stats_in_res_ds', + type=bool, + default=False, + help='Whether to keep the computed stats in the result dataset. If ' + 'it\'s False, the intermediate fields to store the stats ' + 'computed by Filters will be removed. Default: False.') + parser.add_argument( + '--keep_hashes_in_res_ds', + type=bool, + default=False, + help='Whether to keep the computed hashes in the result dataset. If ' + 'it\'s False, the intermediate fields to store the hashes ' + 'computed by Deduplicators will be removed. Default: False.') parser.add_argument('--np', type=PositiveInt, default=4, diff --git a/data_juicer/core/executor.py b/data_juicer/core/executor.py index b724f7106..93d4cb30b 100644 --- a/data_juicer/core/executor.py +++ b/data_juicer/core/executor.py @@ -66,9 +66,13 @@ def __init__(self, cfg=None): # prepare exporter and check export path suffix logger.info('Preparing exporter...') - self.exporter = Exporter(self.cfg.export_path, - self.cfg.export_shard_size, - self.cfg.export_in_parallel, self.cfg.np) + self.exporter = Exporter( + self.cfg.export_path, + self.cfg.export_shard_size, + self.cfg.export_in_parallel, + self.cfg.np, + keep_stats_in_res_ds=self.cfg.keep_stats_in_res_ds, + keep_hashes_in_res_ds=self.cfg.keep_hashes_in_res_ds) # setup tracer self.open_tracer = self.cfg.open_tracer diff --git a/data_juicer/core/exporter.py b/data_juicer/core/exporter.py index 7377e5225..a8c7c35f9 100644 --- a/data_juicer/core/exporter.py +++ b/data_juicer/core/exporter.py @@ -3,7 +3,7 @@ from loguru import logger -from data_juicer.utils.constant import Fields +from data_juicer.utils.constant import Fields, HashKeys class Exporter: @@ -21,6 +21,8 @@ def __init__(self, export_in_parallel=True, num_proc=1, export_ds=True, + keep_stats_in_res_ds=False, + keep_hashes_in_res_ds=False, export_stats=True): """ Initialization method. @@ -31,12 +33,18 @@ def __init__(self, to a single file. :param num_proc: number of process to export the dataset. :param export_ds: whether to export the dataset contents. + :param keep_stats_in_res_ds: whether to keep stats in the result + dataset. + :param keep_hashes_in_res_ds: whether to keep hashes in the result + dataset. :param export_stats: whether to export the stats of dataset. """ self.export_path = export_path self.export_shard_size = export_shard_size self.export_in_parallel = export_in_parallel self.export_ds = export_ds + self.keep_stats_in_res_ds = keep_stats_in_res_ds + self.keep_hashes_in_res_ds = keep_hashes_in_res_ds self.export_stats = export_stats self.suffix = self._get_suffix(export_path) self.num_proc = num_proc @@ -98,8 +106,31 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True): :param export_stats: whether to export stats of dataset. :return: """ + if Fields.stats in dataset.features and export_stats: + # export stats of datasets into a single file. + logger.info('Exporting computed stats into a single file...') + ds_stats = dataset.select_columns(Fields.stats) + stats_file = export_path.replace('.' + suffix, '_stats.jsonl') + Exporter.to_jsonl( + ds_stats, + stats_file, + num_proc=self.num_proc if self.export_in_parallel else 1) + if self.export_ds: # fetch the corresponding export method according to the suffix + if not self.keep_stats_in_res_ds: + extra_fields = {Fields.stats} + feature_fields = set(dataset.features.keys()) + removed_fields = extra_fields.intersection(feature_fields) + dataset = dataset.remove_columns(removed_fields) + if not self.keep_hashes_in_res_ds: + extra_fields = { + HashKeys.hash, HashKeys.minhash, HashKeys.simhash, + HashKeys.imagehash + } + feature_fields = set(dataset.features.keys()) + removed_fields = extra_fields.intersection(feature_fields) + dataset = dataset.remove_columns(removed_fields) export_method = Exporter._router()[suffix] if self.export_shard_size <= 0: # export the whole dataset into one single file. @@ -154,15 +185,6 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True): pool.close() pool.join() - if Fields.stats in dataset.features and export_stats: - # export stats of datasets into a single file. - ds_stats = dataset.select_columns(Fields.stats) - stats_file = export_path.replace('.' + suffix, '_stats.jsonl') - Exporter.to_jsonl( - ds_stats, - stats_file, - num_proc=self.num_proc if self.export_in_parallel else 1) - def export(self, dataset): """ Export method for a dataset. From d0924eeb8dfee17a537ae7f8fab79a3fce89edd5 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Thu, 30 Nov 2023 10:15:56 +0800 Subject: [PATCH 8/8] * bug fixed: unmatched OP name list and actual OP list after skipping some OPs whose dependencies are not installed --- data_juicer/ops/load.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/data_juicer/ops/load.py b/data_juicer/ops/load.py index 9cee5a120..f6a4bd3d9 100644 --- a/data_juicer/ops/load.py +++ b/data_juicer/ops/load.py @@ -17,15 +17,17 @@ def load_ops(process_list, op_fusion=False): :return: The op instance list. """ ops = [] + new_process_list = [] for process in process_list: op_name, args = list(process.items())[0] if op_name in UNAVAILABLE_OPERATORS: logger.warning(UNAVAILABLE_OPERATORS[op_name].get_warning_msg()) continue ops.append(OPERATORS.modules[op_name](**args)) + new_process_list.append(process) # detect filter groups if op_fusion: - process_list, ops = fuse_operators(process_list, ops) + new_process_list, ops = fuse_operators(new_process_list, ops) - return process_list, ops + return new_process_list, ops