From a821c8c67c1f293e34f10582a1b5a6d46854e379 Mon Sep 17 00:00:00 2001 From: ChristianGeng Date: Fri, 26 Jul 2024 10:15:44 +0200 Subject: [PATCH] Polars benchmarks methods (#424) Benchmarks methods for dependencies using alternative dataframe engine polars. --------- Co-authored-by: Christian Geng Co-authored-by: Hagen Wierstorf --- benchmarks/README.md | 66 ++ .../benchmark-dependencies-methods-polars.py | 297 ++++++ benchmarks/benchmark-dependencies-methods.py | 5 + .../compare_dependency_methods_polars.py | 44 + benchmarks/dependencies_polars.py | 982 ++++++++++++++++++ benchmarks/requirements.txt | 2 + 6 files changed, 1396 insertions(+) create mode 100644 benchmarks/benchmark-dependencies-methods-polars.py create mode 100644 benchmarks/compare_dependency_methods_polars.py create mode 100644 benchmarks/dependencies_polars.py diff --git a/benchmarks/README.md b/benchmarks/README.md index 112e69c2..8db4415e 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -80,6 +80,72 @@ using `pyarrow` dtypes). | Dependencies._update_media() | 0.087 | 0.086 | 0.145 | | Dependencies._update_media_version(10000 files) | 0.011 | 0.011 | 0.020 | +## audb.Dependencies methods using polars + +Handling of the dependency table with `pandas` +was further compared to handling it with `polars`, +by reimplementing all methods of `audb.Dependencies` using `polars`. + +This benchmark was executed on: + +* CPU: 12th Gen Intel Core i7-1255U +* RAM: 15.66 GB +* Hard Drive: KBG5AZNT1T02 LA KIOXIA +* Linux: Ubuntu 22.04.4 LTS +* Python 3.11.9 + +To run the benchmark execute: + +```bash +$ python benchmark-dependencies-methods-polars.py +``` + +The data were compared to +the results from `benchmark-dependencies-methods.py` using + +```bash +python compare_dependency_methods_polars.py +``` +Both steps require that `benchmark-dependencies-methods.py` +has been run previously in order to create the test data +and results. The comparison in the `pandas` column is +based on the pyarrow column in the tabulation in the previous +section. + + +| method | pandas | polars | winner | factor | +|-------------------------------------------------|----------|----------|----------|----------| +| Dependencies.\_\_call__() | 0.000 | 0.000 | polars | 2.667 | +| Dependencies.\_\_contains__(10000 files) | 0.003 | 0.002 | polars | 2.005 | +| Dependencies.\_\_get_item__(10000 files) | 0.648 | 0.013 | polars | 50.382 | +| Dependencies.\_\_len__() | 0.000 | 0.000 | pandas | 1.300 | +| Dependencies.\_\_str__() | 0.004 | 0.000 | polars | 24.677 | +| Dependencies._add_attachment() | 0.171 | 0.104 | polars | 1.645 | +| Dependencies._add_media(10000 files) | 0.073 | 0.008 | polars | 9.589 | +| Dependencies._add_meta() | 0.127 | 0.100 | polars | 1.260 | +| Dependencies._drop() | 0.118 | 0.021 | polars | 5.628 | +| Dependencies._remove() | 0.067 | 0.002 | polars | 39.324 | +| Dependencies._update_media() | 0.142 | 0.066 | polars | 2.148 | +| Dependencies._update_media_version(10000 files) | 0.021 | 0.016 | polars | 1.341 | +| Dependencies.archive(10000 files) | 0.045 | 0.014 | polars | 3.250 | +| Dependencies.archives | 0.145 | 0.151 | pandas | 1.045 | +| Dependencies.attachment_ids | 0.018 | 0.008 | polars | 2.375 | +| Dependencies.attachments | 0.017 | 0.008 | polars | 2.194 | +| Dependencies.bit_depth(10000 files) | 0.029 | 0.014 | polars | 2.031 | +| Dependencies.channels(10000 files) | 0.030 | 0.013 | polars | 2.224 | +| Dependencies.checksum(10000 files) | 0.030 | 0.014 | polars | 2.201 | +| Dependencies.duration(10000 files) | 0.028 | 0.014 | polars | 2.066 | +| Dependencies.files | 0.012 | 0.011 | polars | 1.040 | +| Dependencies.format(10000 files) | 0.033 | 0.014 | polars | 2.345 | +| Dependencies.media | 0.068 | 0.040 | polars | 1.702 | +| Dependencies.removed(10000 files) | 0.029 | 0.014 | polars | 2.118 | +| Dependencies.removed_media | 0.068 | 0.038 | polars | 1.809 | +| Dependencies.sampling_rate(10000 files) | 0.029 | 0.014 | polars | 2.102 | +| Dependencies.table_ids | 0.025 | 0.013 | polars | 1.927 | +| Dependencies.tables | 0.017 | 0.008 | polars | 2.166 | +| Dependencies.type(10000 files) | 0.028 | 0.014 | polars | 2.063 | +| Dependencies.version(10000 files) | 0.032 | 0.013 | polars | 2.372 | + ## audb.Dependencies loading/writing to file diff --git a/benchmarks/benchmark-dependencies-methods-polars.py b/benchmarks/benchmark-dependencies-methods-polars.py new file mode 100644 index 00000000..0504ff89 --- /dev/null +++ b/benchmarks/benchmark-dependencies-methods-polars.py @@ -0,0 +1,297 @@ +import random +import time + +import pandas as pd +import tabulate + +import audeer + +import audb + + +random.seed(1) + +cache = audeer.mkdir("./cache") + +CACHE_EXT: str = "pkl" +CACHE_EXT: str = "parquet" +PARQUET_SAVE_OPTS: dict = {"engine": "pyarrow"} +# dtypes : list = ["string", "object", "pyarrow"] +dtypes: list = [ + "polars", +] + + +def set_dependency_module(): + r"""Monkeypatch dependency modult to use polars module.""" + import polars as pl + + from audb.core import define + + depend_index_colname = "file" + depend_index_dtype = pl.datatypes.Object + depend_field_dtypes = dict( + zip( + define.DEPEND_FIELD_DTYPES.keys(), + [ + pl.datatypes.String, + pl.datatypes.Int32, + pl.datatypes.Int32, + pl.datatypes.String, + pl.datatypes.Float64, + pl.datatypes.String, + pl.datatypes.Int32, + pl.datatypes.Int32, + pl.datatypes.Int32, + pl.datatypes.String, + ], + ) + ) + + audb.core.define.DEPEND_INDEX_COLNAME = depend_index_colname + audb.core.define.DEPEND_FIELD_DTYPES_PANDAS = audb.core.define.DEPEND_FIELD_DTYPES + audb.core.define.DEPEND_FIELD_DTYPES = depend_field_dtypes + audb.core.define.DEPEND_INDEX_DTYPE_PANDAS = audb.core.define.DEPEND_INDEX_DTYPE + audb.core.define.DEPEND_INDEX_DTYPE = depend_index_dtype + + import dependencies_polars + + audb.Dependencies = dependencies_polars.Dependencies + + +# === Dependencies load via pickle before monkey_patching === +data_cache = audeer.path(cache, "df.pkl") +deps = audb.Dependencies() +deps.load(data_cache) + +# save cache in parquet format as the polars load method depends on it +parquet_cache = audeer.path(cache, "df.parquet") +deps.save(parquet_cache) + +file = "file-10.wav" +n_files = 10000 +results = pd.DataFrame(columns=["polars"]) +results.index.name = "method" +set_dependency_module() +dtype = "polars" + +for dtype in dtypes: + # load them + deps = audb.Dependencies() + deps.load(parquet_cache) + _files = deps._df["file"][:n_files].to_list() + + # only string meanningful + # expected_dtype = pl.String + + # assert deps._df["archive"].dtype == expected_dtype + + method = "Dependencies.__call__()" + t0 = time.time() + # deps() + t = time.time() - t0 + results.at[method, dtype] = t + + # Access the index one time. + # Further calls will be faster + file in deps + + method = f"Dependencies.__contains__({n_files} files)" + t0 = time.time() + [file in deps for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.__get_item__({n_files} files)" + t0 = time.time() + [deps[file] for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.__len__()" + t0 = time.time() + len(deps) + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.__str__()" + t0 = time.time() + str(deps) + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.archives" + t0 = time.time() + deps.archives + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.attachments" + t0 = time.time() + deps.attachments + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.attachment_ids" + t0 = time.time() + deps.attachment_ids + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.files" + t0 = time.time() + deps.files + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.media" + t0 = time.time() + deps.media + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.removed_media" + t0 = time.time() + deps.removed_media + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.table_ids" + t0 = time.time() + deps.table_ids + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies.tables" + t0 = time.time() + deps.tables + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.archive({n_files} files)" + t0 = time.time() + [deps.archive(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.bit_depth({n_files} files)" + t0 = time.time() + [deps.bit_depth(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.channels({n_files} files)" + t0 = time.time() + [deps.channels(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.checksum({n_files} files)" + t0 = time.time() + [deps.checksum(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.duration({n_files} files)" + t0 = time.time() + [deps.duration(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.format({n_files} files)" + t0 = time.time() + [deps.format(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.removed({n_files} files)" + t0 = time.time() + [deps.removed(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.sampling_rate({n_files} files)" + t0 = time.time() + [deps.sampling_rate(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.type({n_files} files)" + t0 = time.time() + [deps.type(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies.version({n_files} files)" + t0 = time.time() + [deps.version(file) for file in _files] + t = time.time() - t0 + results.at[method, dtype] = t + + # ------------------------------------------------------------------------- + + # TODO: Reimplement + method = "Dependencies._add_attachment()" + t0 = time.time() + deps._add_attachment("attachment.txt", "1.0.0", "archive", "checksum") + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies._add_media({n_files} files)" + values = [ + ( + f"file-new-{n}.wav", # file + f"archive-new-{n}", # archive + 16, # bit_depth + 1, # channels + f"checksum-{n}", # checksum + 0.4, # duration + "wav", # format + 0, # removed + 16000, # sampling_rate + 1, # type + "1.0.0", # version + ) + for n in range(n_files) + ] + t0 = time.time() + deps._add_media(values) + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies._add_meta()" + t0 = time.time() + deps._add_meta("db.new-table.csv", "1.0.0", "archive", "checksum") + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies._drop()" + t0 = time.time() + deps._drop(["file-90000.wav"]) + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies._remove()" + t0 = time.time() + deps._remove(file) + t = time.time() - t0 + results.at[method, dtype] = t + + method = "Dependencies._update_media()" + t0 = time.time() + deps._update_media(values) + t = time.time() - t0 + results.at[method, dtype] = t + + method = f"Dependencies._update_media_version({n_files} files)" + t0 = time.time() + deps._update_media_version([f"file-{n}.wav" for n in range(n_files)], "version") + t = time.time() - t0 + results.at[method, dtype] = t + +# ===== Print results ===== +table = tabulate.tabulate(results, headers="keys", tablefmt="github", floatfmt=".3f") +fp_results = audeer.path(cache, "results_polars.csv") +results.to_csv(fp_results) + +print(table) diff --git a/benchmarks/benchmark-dependencies-methods.py b/benchmarks/benchmark-dependencies-methods.py index 526f1214..02f790ad 100644 --- a/benchmarks/benchmark-dependencies-methods.py +++ b/benchmarks/benchmark-dependencies-methods.py @@ -366,6 +366,11 @@ def astype(df, dtype): t = time.time() - t0 results.at[method, dtype] = t + +# ===== Save results ===== +fp_results = audeer.path(cache, "results.csv") +results.to_csv(fp_results) + # ===== Print results ===== table = tabulate.tabulate(results, headers="keys", tablefmt="github", floatfmt=".3f") print(table) diff --git a/benchmarks/compare_dependency_methods_polars.py b/benchmarks/compare_dependency_methods_polars.py new file mode 100644 index 00000000..1bb89e56 --- /dev/null +++ b/benchmarks/compare_dependency_methods_polars.py @@ -0,0 +1,44 @@ +import pandas as pd +import tabulate + +import audeer + + +cache = audeer.mkdir("./cache") +engines = ["", "_polars"] + + +def parse_df(fp, engine): + df = pd.read_csv(fp, index_col="method") + df["engine"] = engine + if engine == "pandas": + df.drop(columns=["string", "object"], inplace=True) + + df.rename(columns={"polars": "t", "pyarrow": "t"}, inplace=True) + return df + + +def get_col(sr): + """Return columns to annotate df.""" + name = sr.idxmin() + return [name, sr.max() / sr.min()] + + +df_polars = parse_df(audeer.path(cache, "results_polars.csv"), "polars") +df_pandas = parse_df(audeer.path(cache, "results.csv"), "pandas") +df = pd.concat([df_pandas, df_polars], axis=0) + +df = df.reset_index().pivot( + index="method", + columns="engine", + values="t", +) + + +df[["winner", "factor"]] = df[["pandas", "polars"]].apply( + lambda x: get_col(x), axis=1, result_type="expand" +) + + +table = tabulate.tabulate(df, headers="keys", tablefmt="github", floatfmt=".3f") +print(table) diff --git a/benchmarks/dependencies_polars.py b/benchmarks/dependencies_polars.py new file mode 100644 index 00000000..8363f694 --- /dev/null +++ b/benchmarks/dependencies_polars.py @@ -0,0 +1,982 @@ +import errno +import os +import re +import tempfile +import typing + +import pandas as pd +import polars as pl +import pyarrow as pa +import pyarrow.csv as csv +import pyarrow.parquet as parquet + +import audbackend +import audeer + +from audb.core import define + + +pl.Config.set_tbl_hide_dataframe_shape(True) +pl.Config.set_tbl_formatting("NOTHING") +pl.Config.set_tbl_hide_column_data_types(True) + + +class Dependencies: + r"""Dependencies of a database. + + :class:`audb.Dependencies` gathers + all database media, table, and attachment files + and metadata about them + in a single object. + The metadata contains information + about the single files + like duration, + but also what version of the file is required. + + The dependencies of a database can be requested with + :func:`audb.dependencies`. + + Examples: + >>> deps = Dependencies() + >>> deps() + Empty DataFrame + Columns: [archive, bit_depth, channels, checksum, duration, format, removed, sampling_rate, type, version] + Index: [] + >>> # Request dependencies for emodb 1.4.1 + >>> deps = audb.dependencies("emodb", version="1.4.1") + >>> # List all files or archives + >>> deps.files[:3] + ['db.emotion.csv', 'db.files.csv', 'wav/03a01Fa.wav'] + >>> deps.archives[:2] + ['005d2b91-5317-0c80-d602-6d55f0323f8c', '014f82d8-3491-fd00-7397-c3b2ac3b2875'] + >>> # Access properties for a given file + >>> deps.archive("wav/03a01Fa.wav") + 'c1f5cc6f-6d00-348a-ba3b-4adaa2436aad' + >>> deps.duration("wav/03a01Fa.wav") + 1.89825 + >>> deps.removed("wav/03a01Fa.wav") + False + >>> # Check if a file is part of the dependencies + >>> "wav/03a01Fa.wav" in deps + True + + """ # noqa: E501 + + def __init__(self): + data = {} + for name, dtype in zip( + define.DEPEND_FIELD_NAMES.values(), + define.DEPEND_FIELD_DTYPES.values(), + ): + # data[name] = pd.Series(dtype=dtype) + data[name] = pl.Series(dtype=dtype) + + # self._df = pd.DataFrame(data) + + # always an empty frame. + # deps can only be loaded or self._df set + self._df = pl.DataFrame(data) + self.index_col = define.DEPEND_INDEX_COLNAME + # self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE) + + # pyarrow schema + # used for reading and writing files + # self._schema = pa.schema( + # [ + # ("file", pa.string()), + # ("archive", pa.string()), + # ("bit_depth", pa.int32()), + # ("channels", pa.int32()), + # ("checksum", pa.string()), + # ("duration", pa.float64()), + # ("format", pa.string()), + # ("removed", pa.int32()), + # ("sampling_rate", pa.int32()), + # ("type", pa.int32()), + # ("version", pa.string()), + # ] + # ) + + # self._schema = utils.pascheme_to_plscheme(self._schema) + # polars the df has a schema, it can be converted to dict + # using dict(self._df.schema) + self._schema = self._df.schema + + def __call__(self) -> pd.DataFrame: + r"""Return dependencies as a table. + + Returns: + table with dependencies + + """ + return self._df + + def __contains__(self, file: str) -> bool: + r"""Check if file is part of dependencies. + + Args: + file: relative file path + + Returns: + ``True`` if a dependency to the file exists + + """ + return file in self._idx + + def __eq__(self, other: "Dependencies") -> bool: + r"""Check if two dependency tables are equal. + + Args: + other: dependency table to compare against + + Returns: + ``True`` if both dependency tables have the same entries + + """ + return self._df.equals(other._df) + + def __getitem__(self, file: str) -> typing.List: + r"""File information. + + Args: + file: relative file path + + Returns: + list with meta information + + + """ + # (self._df[self.index_col] == file).to_list() + # return .loc[file].tolist() + # item = pl.Series(self._df.filter(pl.col(self.index_col) == file)).to_list() + # even slower: + # item = np.array(self._df.filter(pl.col(self.index_col) == file)).tolist()[0] + # pandas: + # self._df.loc[file].tolist() + item = list(self._df.row(self._idx[file]))[1:] + return item + + def __len__(self) -> int: + r"""Number of all media, table, attachment files.""" + return len(self._idx) + + def __str__(self) -> str: # noqa: D105 + n = 15 # number of lines identical to pandas + str_repr = str(self._df.head(n)) + return str_repr + + @property + def archives(self) -> typing.List[str]: + r"""All media, table, attachment archives. + + Return: + list of archives + + + This is not benchmarked at the moment + Current implementation is the fastest of + + self._df["archive"].sort(descending=False).unique(maintain_order=True).to_list() + self._df["archive"].unique(maintain_order=False).sort(descending=False).to_list() + 0.144546 + sorted(self._df["archive"].unique().to_list()) + + Sorting? + + Result: + ['archive-0', 'archive-1', 'archive-2', 'archive-3', ...] + """ + # 0.014 + # return ( + # self._df["archive"] + # .unique(maintain_order=False) + # .sort(descending=False) + # .to_list() + # ) + + # 0.014 + # return ( + # self._df.lazy() + # .select(pl.col("archive")) + # .unique() + # .sort(pl.col("archive")) + # .collect()["archive"] + # .to_list() + # ) + + # 0.014 + return pl.Series(self._df.select(pl.col("archive"))).unique().sort().to_list() + + # 0.413 - very slow to use list sorting + # return sorted( + # pl.Series(self._df.select(pl.col("archive"))).unique().to_list() + # ) + + @property + def attachments(self) -> typing.List[str]: + r"""Attachment paths (can be a file or a folder). + + Returns: + list of attachments + + """ + deptype = define.DependType.ATTACHMENT + return self._df.filter(pl.col("type") == deptype)[self.index_col].to_list() + + @property + def attachment_ids(self) -> typing.List[str]: + r"""Attachment IDs. + + Returns: + list of attachment IDs + + """ + return self._df.filter(pl.col("type") == define.DependType.ATTACHMENT)[ + "archive" + ].to_list() + + @property + def files(self) -> typing.List[str]: + r"""All media, table, attachments. + + Returns: + list of files + + """ + return list(self._idx.keys()) + + @property + def media(self) -> typing.List[str]: + r"""Media files. + + Returns: + list of media + + """ + return self._df.filter(pl.col("type") == define.DependType.MEDIA)[ + self.index_col + ].to_list() + + @property + def removed_media(self) -> typing.List[str]: + r"""Removed media files. + + Returns: + list of media + + """ + return self._df.filter( + (pl.col("type") == define.DependType.MEDIA) & (self._df["removed"] == 1) + )[self.index_col].to_list() + + @property + def table_ids(self) -> typing.List[str]: + r"""Table IDs. + + Like :meth:`audb.Dependencies.tables`, + but only returns the table ID, + i.e. ``db..csv``. + + Returns: + list of table IDs + + """ + return [table[3:-4] for table in self.tables] + + @property + def tables(self) -> typing.List[str]: + r"""Tables files. + + Returns: + list of tables + + """ + return self._df.filter(pl.col("type") == define.DependType.META)[ + self.index_col + ].to_list() + + def archive(self, file: str) -> str: + r"""Name of archive the file belongs to. + + Args: + file: relative file path + + Returns: + archive name + + Example: 'archive-0' + """ + # return self._df.filter(pl.col("file") == file).item(0, "archive") + # pandas implementation + # return self._df.archive[file] + return self._df.row(self._idx[file], named=True)["archive"] + + def bit_depth(self, file: str) -> int: + r"""Bit depth of media file. + + Args: + file: relative file path + + Returns: + bit depth + + """ + # works for both pandas and polars + return self._column_loc("bit_depth", file, int) + + def channels(self, file: str) -> int: + r"""Number of channels of media file. + + Args: + file: relative file path + + Returns: + number of channels + + """ + return self._column_loc("channels", file, int) + + def checksum(self, file: str) -> str: + r"""Checksum of file. + + Args: + file: relative file path + + Returns: + checksum of file + + """ + return self._column_loc("checksum", file) + + def duration(self, file: str) -> float: + r"""Duration of file. + + Args: + file: relative file path + + Returns: + duration in seconds + + """ + return self._column_loc("duration", file, float) + + def format(self, file: str) -> str: + r"""Format of file. + + Args: + file: relative file path + + Returns: + file format (always lower case) + + """ + return self._column_loc("format", file) + + def _update_idx(self): + """Update index as polars does not have any.""" + self._idx = dict( + zip(self._df[self.index_col], range(len(self._df[self.index_col]))) + ) + + def load(self, path: str): + r"""Read dependencies from file. + + Clears existing dependencies. + + Args: + path: path to file. + File extension can be ``csv`` + ``pkl``, + or ``parquet`` + + Raises: + ValueError: if file extension is not one of + ``csv``, ``pkl``, ``parquet`` + FileNotFoundError: if ``path`` does not exists + + """ + path = audeer.path(path) + extension = audeer.file_extension(path) + if extension not in ["csv", "pkl", "parquet"]: + raise ValueError( + f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' " + f"not '{extension}'" + ) + if not os.path.exists(path): + raise FileNotFoundError( + errno.ENOENT, + os.strerror(errno.ENOENT), + path, + ) + if extension == "pkl": + self._df = pd.read_pickle(path) + # Correct dtype of index + # to make backward compatiple + # with old pickle files in cache + # that might use `string` as dtype + if self._df.index.dtype != define.DEPEND_INDEX_DTYPE: + self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE) + + elif extension == "csv": + table = csv.read_csv( + path, + read_options=csv.ReadOptions( + column_names=self._schema.names, + skip_rows=1, + ), + convert_options=csv.ConvertOptions(column_types=self._schema), + ) + self._df = self._table_to_dataframe(table) + + elif extension == "parquet": + table = parquet.read_table(path) + # path + # self._df = self._table_to_dataframe(table) + # self._df = pl.read_parquet(path) + # there is no index! + # self._df = pl.read_parquet(path).rename( + # {"__index_level_0__": self.index_col} + # ) + # new polars version reads index (name file) directly into variable + self._df = pl.read_parquet(path) + # need to maintain an index + self._update_idx() + + def removed(self, file: str) -> bool: + r"""Check if file is marked as removed. + + Args: + file: relative file path + + Returns: + ``True`` if file was removed + + """ + return self._column_loc("removed", file, bool) + + def sampling_rate(self, file: str) -> int: + r"""Sampling rate of media file. + + Args: + file: relative file path + + Returns: + sampling rate in Hz + + """ + return self._column_loc("sampling_rate", file, int) + + def save(self, path: str): + r"""Write dependencies to file. + + Args: + path: path to file. + File extension can be ``csv``, ``pkl``, or ``parquet`` + + """ + path = audeer.path(path) + if path.endswith("csv"): + table = self._dataframe_to_table(self._df) + csv.write_csv( + table, + path, + write_options=csv.WriteOptions(quoting_style="none"), + ) + elif path.endswith("pkl"): + self._df.to_pickle( + path, + protocol=4, # supported by Python >= 3.4 + ) + elif path.endswith("parquet"): + table = self._dataframe_to_table(self._df, file_column=True) + parquet.write_table(table, path) + + def type(self, file: str) -> int: + r"""Type of file. + + Args: + file: relative file path + + Returns: + type + + """ + return self._column_loc("type", file, int) + + def version(self, file: str) -> str: + r"""Version of file. + + Args: + file: relative file path + + Returns: + version string + + """ + return self._column_loc("version", file) + + def _add_attachment( + self, + file: str, + version: str, + archive: str, + checksum: str, + ): + r"""Add or update attachment. + + Args: + file: relative path of attachment + version: version string + archive: archive name without extension + checksum: checksum of file + + Note that this is considered unstable, see documentation + we use pl.DataFrame.update + + """ + format = audeer.file_extension(file).lower() + entry = [ + file, + archive, # archive + 0, # bit_depth + 0, # channels + checksum, # checksum + 0.0, # duration + format, # format + 0, # removed + 0, # sampling_rate + define.DependType.ATTACHMENT, # type + version, # version + ] + + self._df = self._df.update( + pl.from_records([entry], schema=self._df.schema, orient="row"), + how="full", + left_on=["file"], + right_on=["file"], + ) + + # update index only if a new attachmenbt is coming in + if file not in self._idx: + self._idx[file] = len(self._idx) + + def _add_media( + self, + values: typing.Sequence[ + typing.Tuple[ + str, # file + str, # archive + int, # bit_depth + int, # channels + str, # checksum + float, # duration + str, # format + int, # removed + float, # sampling_rate + int, # type + str, # version + ] + ], + ): + r"""Add media files. + + Args: + values: list of tuples, + where each tuple holds the values of a new media entry + + """ + self._df = pl.concat( + [self._df, pl.from_records(values, schema=self._df.schema, orient="row")], + how="vertical_relaxed", + ) + + def _add_meta( + self, + file: str, + version: str, + archive: str, + checksum: str, + ): + r"""Add or update table file. + + Args: + file: relative file path + archive: archive name without extension + checksum: checksum of file + version: version string + + """ + format = audeer.file_extension(file).lower() + + entry = [ + file, + archive, # archive + 0, # bit_depth + 0, # channels + checksum, # checksum + 0.0, # duration + format, # format + 0, # removed + 0, # sampling_rate + define.DependType.META, # type + version, # version + ] + + self._df = self._df.update( + pl.from_records([entry], schema=self._df.schema, orient="row"), + how="full", + left_on=["file"], + right_on=["file"], + ) + + # update index only if a new attachmenbt is coming in + if file not in self._idx: + self._idx[file] = len(self._idx) + + def _column_loc( + self, + column: str, + file: str, + dtype: typing.Callable = None, + ) -> typing.Any: + r"""Column content for selected files and column.""" + value = self._df.row(self._idx[file], named=True)[column] + if dtype is not None: + value = dtype(value) + return value + + def _dataframe_to_table( + self, + df: pd.DataFrame, + *, + file_column: bool = False, + ) -> pa.Table: + r"""Convert pandas dataframe to pyarrow table. + + Args: + df: dependency table as pandas dataframe + file_column: if ``False`` + the ``"file"`` column + is renamed to ``""`` + + Returns: + dependency table as pyarrow table + + """ + table = pa.Table.from_pandas( + df.reset_index().rename(columns={"index": "file"}), + preserve_index=False, + schema=self._schema, + ) + if not file_column: + columns = table.column_names + columns = ["" if c == "file" else c for c in columns] + table = table.rename_columns(columns) + return table + + def _drop(self, files: typing.Sequence[str]): + r"""Drop files from table. + + Args: + files: relative file paths + + """ + # self._df.drop is slow, + # see https://stackoverflow.com/a/53394627. + # The solution presented in https://stackoverflow.com/a/53395360 + # self._df = self._df.loc[self._df.index.drop(files)] + # which is claimed to be faster, + # isn't. + # pandas: + # self._df = self._df[~self._df.index.isin(files)] + + # polars + self._df = self._df.filter(~self._df[self.index_col].is_in(files)) + + def _remove(self, file: str): + r"""Mark file as removed. + + Args: + file: relative file path + + """ + self._df.with_columns( + pl.when(pl.col("file") == file) + .then(1) + .otherwise(pl.col("removed")) + .alias("removed") + ) + + def _table_to_dataframe(self, table: pa.Table) -> pd.DataFrame: + r"""Convert pyarrow table to pandas dataframe. + + Args: + table: dependency table as pyarrow table + + Returns: + dependency table as pandas dataframe + + """ + df = table.to_pandas( + deduplicate_objects=False, + # Convert to pyarrow dtypes, + # but ensure we use pd.StringDtype("pyarrow") + # and not pd.ArrowDtype(pa.string()) + # see https://pandas.pydata.org/docs/user_guide/pyarrow.html + types_mapper={ + pa.string(): pd.StringDtype("pyarrow"), + pa.int32(): pd.ArrowDtype(pa.int32()), + pa.float64(): pd.ArrowDtype(pa.float64()), + }.get, # we have to provide a callable, not a dict + ) + df.set_index("file", inplace=True) + df.index.name = None + df.index = df.index.astype(define.DEPEND_INDEX_DTYPE) + return df + + def _update_media( + self, + values: typing.Sequence[ + typing.Tuple[ + str, # file + str, # archive + int, # bit_depth + int, # channels + str, # checksum + float, # duration + str, # format + int, # removed + float, # sampling_rate + int, # type + str, # version + ] + ], + ): + r"""Update media files. + + Args: + values: list of tuples, + where each tuple holds the new values for a media entry + + """ + # Slow implementations: + # + # newer concat + # self._df = pl.concat( + # [self._df, pl.from_records(values, schema=self._df.schema)], + # how="vertical_relaxed", + # ) + + # predicate replacement example + # self._df.with_columns( + # pl.when(pl.col("file") == file) + # .then(1) + # .otherwise(pl.col("removed")).alias("removed") + # ) + + self._df = self._df.update( + pl.from_records(values, schema=self._df.schema, orient="row") + ) + # no need to update self._idx as this is updating existing media + + def _update_media_version( + self, + files: typing.Sequence[str], + version: str, + ): + r"""Update version of media files. + + Args: + files: relative file paths + version: version string + + """ + # NOTE: pandas way of counting index + field = define.DEPEND_FIELD_NAMES[define.DependField.VERSION] + # using new data allocation! + self._df = pl.concat( + [ + self._df.filter(~pl.col(self.index_col).is_in(files)), + self._df.filter(pl.col(self.index_col).is_in(files)).with_columns( + pl.col(field).str.replace(".*", version) + ), + ], + how="vertical", + ) + # no need to update self._idx as only updating + + +def error_message_missing_object( + object_type: str, + missing_object_id: typing.Union[str, typing.Sequence], + database_name: str = None, + database_version: str = None, +) -> str: + r"""Error message for missing objects. + + Args: + object_type: object that is supposed to contain ``missing_object_id``, + should be + ``'media'``, + ``'table'`` + or ``'attachment'`` + missing_object_id: ID of missing object + database_name: name of affected database + database_version: name of affected database + + Returns: + error message + + """ + if object_type == "media": + object_name = f"{object_type} file" + else: + object_name = object_type + + if isinstance(missing_object_id, str): + msg = f"Could not find a {object_name} " f"matching '{missing_object_id}'" + else: + msg = f"Could not find the {object_name} " f"'{missing_object_id[0]}'" + if database_name is not None and database_version is not None: + msg += f" in {database_name} v{database_version}" + return msg + + +def filter_deps( + requested_deps: typing.Optional[typing.Union[str, typing.Sequence[str]]], + available_deps: typing.Sequence[str], + deps_type: str, + database_name: str = None, + database_version: str = None, +) -> typing.Sequence[str]: + r"""Filter dependency files by requested files. + + Args: + requested_deps: include only media, tables + matching the regular expression + or provided in the list + available_deps: sequence of available media files or tables + deps_type: ``'attachment'``, ``'media'`` or ``'table'`` + database_name: name of affected database + database_version: name of affected database + + Returns: + list of attachments, media or tables + inside the dependency object + matching ``requested_deps`` + + """ + if requested_deps is None: + return available_deps + elif len(requested_deps) == 0: + return [] + + if isinstance(requested_deps, str): + request = requested_deps + pattern = re.compile(request) + requested_deps = [] + for dep in available_deps: + if pattern.search(dep): + requested_deps.append(dep) + if len(requested_deps) == 0: + msg = error_message_missing_object( + deps_type, + request, + database_name, + database_version, + ) + raise ValueError(msg) + else: + for dep in requested_deps: + if dep not in available_deps: + msg = error_message_missing_object( + deps_type, + [dep], + database_name, + database_version, + ) + raise ValueError(msg) + + return requested_deps + + +def download_dependencies( + backend_interface: typing.Type[audbackend.interface.Base], + name: str, + version: str, + verbose: bool, +) -> Dependencies: + r"""Load dependency file from backend. + + Download dependency file + for requested database + to a temporary folder, + and return an dependency object + loaded from that file. + + Args: + backend_interface: backend interface + name: database name + version: database version + verbose: if ``True`` a message is shown during download + + Returns: + dependency object + + """ + with tempfile.TemporaryDirectory() as tmp_root: + # Load `db.parquet` file, + # or if non-existent `db.zip` + # from backend + remote_deps_file = backend_interface.join("/", name, define.DEPENDENCIES_FILE) + if backend_interface.exists(remote_deps_file, version): + local_deps_file = os.path.join(tmp_root, define.DEPENDENCIES_FILE) + backend_interface.get_file( + remote_deps_file, + local_deps_file, + version, + verbose=verbose, + ) + else: + remote_deps_file = backend_interface.join("/", name, define.DB + ".zip") + local_deps_file = os.path.join( + tmp_root, + define.LEGACY_DEPENDENCIES_FILE, + ) + backend_interface.get_archive( + remote_deps_file, + tmp_root, + version, + verbose=verbose, + ) + # Create deps object from downloaded file + deps = Dependencies() + deps.load(local_deps_file) + return deps + + +def upload_dependencies( + backend_interface: typing.Type[audbackend.interface.Base], + deps: Dependencies, + db_root: str, + name: str, + version: str, +): + r"""Upload dependency file to backend. + + Store a dependency file + in the local database root folder, + and upload it to the backend. + + Args: + backend_interface: backend interface + deps: dependency object + db_root: database root folder + name: database name + version: database version + + """ + local_deps_file = os.path.join(db_root, define.DEPENDENCIES_FILE) + remote_deps_file = backend_interface.join("/", name, define.DEPENDENCIES_FILE) + deps.save(local_deps_file) + backend_interface.put_file(local_deps_file, remote_deps_file, version) diff --git a/benchmarks/requirements.txt b/benchmarks/requirements.txt index bcf2fff9..f4aa67c5 100644 --- a/benchmarks/requirements.txt +++ b/benchmarks/requirements.txt @@ -1,3 +1,5 @@ memray ==1.11.0 +pandas +polars pyarrow tabulate