Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Analyzer support for stats of multimodal OPs & other minor enhancements #106

Merged
merged 8 commits into from
Nov 30, 2023
12 changes: 9 additions & 3 deletions data_juicer/analysis/column_wise_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import matplotlib.pyplot as plt
import pandas as pd
from tqdm import tqdm

from data_juicer.utils.constant import Fields

Expand Down Expand Up @@ -111,8 +112,11 @@ def analyse(self, show_percentiles=False, show=False):
fig = plt.figure(figsize=(rec_width, rec_height),
layout='constrained')
subfigs = fig.subfigures(rec_row, rec_col, wspace=0.01)
for i, column_name in enumerate(columns):
for i, column_name in enumerate(tqdm(columns.to_list(),
desc='Column')):
data = self.stats[column_name]
# explode data to flatten inner list
data = data.explode().infer_objects()
grid = grid_indexes[i]
if self.save_stats_in_one_file:
if rec_col == 1:
Expand All @@ -128,7 +132,8 @@ def analyse(self, show_percentiles=False, show=False):

# numeric or string via nan. Apply different plot method for them.
if pd.isna(self.overall_result[column_name].get('top')):
# numeric -- draw histogram and box plot for this stat
# numeric or numeric list -- draw histogram and box plot for
# this stat
percentiles = self.overall_result[column_name] \
if show_percentiles else None

Expand All @@ -152,7 +157,8 @@ def analyse(self, show_percentiles=False, show=False):
f'{column_name}-box.png'),
percentiles=percentiles)
else:
# object (string) -- only draw histogram for this stat
# object (string) or string list -- only draw histogram for
# this stat
if self.save_stats_in_one_file:
axes = subfig.subplots(1, 1)
else:
Expand Down
54 changes: 52 additions & 2 deletions data_juicer/analysis/overall_analysis.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import os
from multiprocessing import Pool

import pandas as pd
from loguru import logger
from tqdm import tqdm

from data_juicer.utils.constant import Fields


def _single_column_analysis(col, *args, **kwargs):
col_overall = col.describe(*args, **kwargs)
return col_overall


class OverallAnalysis:
"""Apply analysis on the overall stats, including mean, std, quantiles,
etc."""
Expand All @@ -23,18 +31,60 @@ def __init__(self, dataset, output_path):

# default percentiles to analyse
self.default_percentiles = [0.25, 0.5, 0.75]
# supported dtypes of column to be analysed
# Notice: there won't be mixed types in a column because the stats is
# obtained from Dataset, which doesn't allow mixed types.
# Notice: for now, stats can only be:
# {numbers, string, list of one of before}
self.supported_object_types = {str, list}

def analyse(self, percentiles=[]):
def refine_single_column(self, col):
if col.dtype != 'object':
# not an object, return directly
return col
# if the type of this column is object, we can decide the actual type
# according to the first element.
first = col[0]
if type(first) not in self.supported_object_types:
logger.warning(f'There is a column of stats with type '
f'[{type(first)}], which is not supported to be '
f'analysed for now.')
return None
if type(first) is str:
# describe(include = 'all') can analyze the string type
return col
elif type(first) is list:
# flatten and infer the type
col = col.explode().infer_objects()
return col

def analyse(self, percentiles=[], num_proc=1):
"""
Apply overall analysis on the whole dataset based on the describe
method of pandas.

:param percentiles: percentiles to analyse
:param num_proc: number of processes to analyse the dataset
:return: the overall analysis result.
"""
# merge default and customized percentiles and get overall information
percentiles = list(set(percentiles + self.default_percentiles))
overall = self.stats.describe(percentiles=percentiles, include='all')

results = []
pool = Pool(num_proc)
for col_name in self.stats.columns:
this_col = self.refine_single_column(self.stats[col_name])
res = pool.apply_async(_single_column_analysis,
kwds={
'col': this_col,
'percentiles': percentiles,
'include': 'all',
})
results.append(res)
pool.close()
pool.join()
result_cols = [res.get() for res in tqdm(results)]
overall = pd.DataFrame(result_cols).T

# export to result report file
overall.to_csv(os.path.join(self.output_path, 'overall.csv'))
Expand Down
14 changes: 14 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ def init_configs(args=None):
'due to the IO blocking, especially for very large datasets. '
'When this happens, False is a better choice, although it takes '
'more time.')
parser.add_argument(
'--keep_stats_in_res_ds',
type=bool,
default=False,
help='Whether to keep the computed stats in the result dataset. If '
'it\'s False, the intermediate fields to store the stats '
'computed by Filters will be removed. Default: False.')
parser.add_argument(
'--keep_hashes_in_res_ds',
type=bool,
default=False,
help='Whether to keep the computed hashes in the result dataset. If '
'it\'s False, the intermediate fields to store the hashes '
'computed by Deduplicators will be removed. Default: False.')
parser.add_argument('--np',
type=PositiveInt,
default=4,
Expand Down
38 changes: 21 additions & 17 deletions data_juicer/core/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from data_juicer.utils import cache_utils
from data_juicer.utils.constant import Fields

from .data import add_same_content_to_new_column
from .exporter import Exporter


Expand Down Expand Up @@ -87,12 +88,14 @@ def run(self, load_data_np=None):
op_name = list(op_cfg.keys())[0]
if isinstance(op, Filter):
if Fields.stats not in dataset.features:
# TODO:
# this is a temp solution,
# only add stats when calling filter op
dataset = dataset.add_column(name=Fields.stats,
column=[{}] *
dataset.num_rows)
dataset = dataset.map(add_same_content_to_new_column,
fn_kwargs={
'new_column_name': Fields.stats,
'initial_value': {}
},
num_proc=self.cfg.np,
desc='Adding new column for stats')
dataset = dataset.map(op.compute_stats,
num_proc=self.cfg.np,
desc=op_name + '_compute_stats')
Expand All @@ -102,15 +105,22 @@ def run(self, load_data_np=None):
'the process list in configs.')
return dataset

# 3. analysis and output result to the export path
# 3.1. Only consider fields in Fields.stats
# 3.2. For string fields, only consider its histogram
# 3.3. For numeric fields, consider its histogram and box
# 3.4. Otherwise, DO NOT analyse
# 3. data export
logger.info('Exporting dataset to disk...')
self.exporter.export(dataset)
if self.cfg.use_cache and self.cfg.cache_compress:
from data_juicer.utils.compress import compress
compress(dataset)

# 4. analysis and output result to the export path
# 4.1. Only consider fields in Fields.stats
# 4.2. For string fields, only consider its histogram
# 4.3. For numeric fields, consider its histogram and box
# 4.4. Otherwise, DO NOT analyse

logger.info('Applying overall analysis on stats...')
overall_analysis = OverallAnalysis(dataset, self.analysis_path)
self.overall_result = overall_analysis.analyse()
self.overall_result = overall_analysis.analyse(num_proc=self.cfg.np)

logger.info('Applying column-wise analysis on stats...')
column_wise_analysis = ColumnWiseAnalysis(
Expand All @@ -120,10 +130,4 @@ def run(self, load_data_np=None):
save_stats_in_one_file=self.cfg.save_stats_in_one_file)
column_wise_analysis.analyse()

# 4. data export
logger.info('Exporting dataset to disk...')
self.exporter.export(dataset)
if self.cfg.use_cache and self.cfg.cache_compress:
from data_juicer.utils.compress import compress
compress(dataset)
return dataset
14 changes: 14 additions & 0 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,17 @@ def nested_query(root_obj: Union[NestedDatasetDict, NestedDataset,
return None

return None


def add_same_content_to_new_column(sample,
new_column_name,
initial_value=None):
"""
A helper function to speed up add_column function. Apply map on this
function in parallel instead of using add_column.
:param sample: a single sample to add this new column/field.
:param new_column_name: the name of this new column/field.
:param initial_value: the initial value of this new column/field.
"""
sample[new_column_name] = initial_value
return sample
24 changes: 16 additions & 8 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from data_juicer.utils.ckpt_utils import CheckpointManager
from data_juicer.utils.constant import Fields

from .data import add_same_content_to_new_column
from .exporter import Exporter
from .tracer import Tracer

Expand Down Expand Up @@ -65,9 +66,13 @@ def __init__(self, cfg=None):

# prepare exporter and check export path suffix
logger.info('Preparing exporter...')
self.exporter = Exporter(self.cfg.export_path,
self.cfg.export_shard_size,
self.cfg.export_in_parallel, self.cfg.np)
self.exporter = Exporter(
self.cfg.export_path,
self.cfg.export_shard_size,
self.cfg.export_in_parallel,
self.cfg.np,
keep_stats_in_res_ds=self.cfg.keep_stats_in_res_ds,
keep_hashes_in_res_ds=self.cfg.keep_hashes_in_res_ds)

# setup tracer
self.open_tracer = self.cfg.open_tracer
Expand Down Expand Up @@ -125,12 +130,15 @@ def run(self, load_data_np=None):
op.text_key)
elif isinstance(op, Filter):
if Fields.stats not in dataset.features:
# TODO:
# this is a temp solution,
# only add stats when calling filter op
dataset = dataset.add_column(name=Fields.stats,
column=[{}] *
dataset.num_rows)
dataset = dataset.map(
add_same_content_to_new_column,
fn_kwargs={
'new_column_name': Fields.stats,
'initial_value': {}
},
num_proc=self.cfg.np,
desc='Adding new column for stats')
if self.cfg.use_checkpoint:
prev = dataset
dataset = dataset.map(op.compute_stats,
Expand Down
42 changes: 32 additions & 10 deletions data_juicer/core/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from loguru import logger

from data_juicer.utils.constant import Fields
from data_juicer.utils.constant import Fields, HashKeys


class Exporter:
Expand All @@ -21,6 +21,8 @@ def __init__(self,
export_in_parallel=True,
num_proc=1,
export_ds=True,
keep_stats_in_res_ds=False,
keep_hashes_in_res_ds=False,
export_stats=True):
"""
Initialization method.
Expand All @@ -31,12 +33,18 @@ def __init__(self,
to a single file.
:param num_proc: number of process to export the dataset.
:param export_ds: whether to export the dataset contents.
:param keep_stats_in_res_ds: whether to keep stats in the result
dataset.
:param keep_hashes_in_res_ds: whether to keep hashes in the result
dataset.
:param export_stats: whether to export the stats of dataset.
"""
self.export_path = export_path
self.export_shard_size = export_shard_size
self.export_in_parallel = export_in_parallel
self.export_ds = export_ds
self.keep_stats_in_res_ds = keep_stats_in_res_ds
self.keep_hashes_in_res_ds = keep_hashes_in_res_ds
self.export_stats = export_stats
self.suffix = self._get_suffix(export_path)
self.num_proc = num_proc
Expand Down Expand Up @@ -98,8 +106,31 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True):
:param export_stats: whether to export stats of dataset.
:return:
"""
if Fields.stats in dataset.features and export_stats:
# export stats of datasets into a single file.
logger.info('Exporting computed stats into a single file...')
ds_stats = dataset.select_columns(Fields.stats)
stats_file = export_path.replace('.' + suffix, '_stats.jsonl')
Exporter.to_jsonl(
ds_stats,
stats_file,
num_proc=self.num_proc if self.export_in_parallel else 1)

if self.export_ds:
# fetch the corresponding export method according to the suffix
if not self.keep_stats_in_res_ds:
extra_fields = {Fields.stats}
feature_fields = set(dataset.features.keys())
removed_fields = extra_fields.intersection(feature_fields)
dataset = dataset.remove_columns(removed_fields)
if not self.keep_hashes_in_res_ds:
extra_fields = {
HashKeys.hash, HashKeys.minhash, HashKeys.simhash,
HashKeys.imagehash
}
feature_fields = set(dataset.features.keys())
removed_fields = extra_fields.intersection(feature_fields)
dataset = dataset.remove_columns(removed_fields)
export_method = Exporter._router()[suffix]
if self.export_shard_size <= 0:
# export the whole dataset into one single file.
Expand Down Expand Up @@ -154,15 +185,6 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True):
pool.close()
pool.join()

if Fields.stats in dataset.features and export_stats:
# export stats of datasets into a single file.
ds_stats = dataset.select_columns(Fields.stats)
stats_file = export_path.replace('.' + suffix, '_stats.jsonl')
Exporter.to_jsonl(
ds_stats,
stats_file,
num_proc=self.num_proc if self.export_in_parallel else 1)

def export(self, dataset):
"""
Export method for a dataset.
Expand Down
15 changes: 11 additions & 4 deletions data_juicer/format/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def load_dataset(self, num_proc: int = 1, global_cfg=None) -> Dataset:
**self.kwargs)
if self.add_suffix:
logger.info('Add suffix info into dataset...')
datasets = add_suffixes(datasets)
datasets = add_suffixes(datasets, num_proc)
else:
from data_juicer.core.data import NestedDataset
datasets = NestedDataset(
Expand Down Expand Up @@ -120,18 +120,25 @@ def load_dataset(self, num_proc: int = 1, global_cfg=None) -> Dataset:
return ds


def add_suffixes(datasets: DatasetDict) -> Dataset:
def add_suffixes(datasets: DatasetDict, num_proc: int = 1) -> Dataset:
"""
Add suffix filed to datasets.

:param datasets: a DatasetDict object
:param num_proc: number of processes to add suffixes
:return: datasets with suffix features.
"""
logger.info('Add suffix column for dataset')
from data_juicer.core.data import add_same_content_to_new_column
for key, ds in datasets.items():
if Fields.suffix not in ds.features:
datasets[key] = ds.add_column(name=Fields.suffix,
column=['.' + key] * ds.num_rows)
datasets[key] = ds.map(add_same_content_to_new_column,
fn_kwargs={
'new_column_name': Fields.suffix,
'initial_value': '.' + key
},
num_proc=num_proc,
desc='Adding new column for suffix')
datasets = concatenate_datasets([ds for _, ds in datasets.items()])
from data_juicer.core.data import NestedDataset
return NestedDataset(datasets)
Expand Down
Loading