diff --git a/doc/source/io.rst b/doc/source/io.rst index 68faefa872c88..13828200f61cd 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4668,6 +4668,43 @@ Passing ``index=True`` will *always* write the index, even if that's not the underlying engine's default behavior. +Partitioning Parquet files +'''''''''''''''''''''''''' + +.. versionadded:: 0.24.0 + +Parquet supports partitioning of data based on the values of one or more columns. + +.. ipython:: python + + df = pd.DataFrame({'a': [0, 0, 1, 1], 'b': [0, 1, 0, 1]}) + df.to_parquet(fname='test', engine='pyarrow', partition_cols=['a'], compression=None) + +The `fname` specifies the parent directory to which data will be saved. +The `partition_cols` are the column names by which the dataset will be partitioned. +Columns are partitioned in the order they are given. The partition splits are +determined by the unique values in the partition columns. +The above example creates a partitioned dataset that may look like: + +.. code-block:: text + + test + ├── a=0 + │ ├── 0bac803e32dc42ae83fddfd029cbdebc.parquet + │ └── ... + └── a=1 + ├── e6ab24a4f45147b49b54a662f0c412a3.parquet + └── ... + +.. ipython:: python + :suppress: + + from shutil import rmtree + try: + rmtree('test') + except Exception: + pass + .. _io.sql: SQL Queries diff --git a/doc/source/whatsnew/v0.24.0.txt b/doc/source/whatsnew/v0.24.0.txt index 695c4a4e16c9d..efb850418f0aa 100644 --- a/doc/source/whatsnew/v0.24.0.txt +++ b/doc/source/whatsnew/v0.24.0.txt @@ -235,6 +235,7 @@ Other Enhancements - New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`). - Compatibility with Matplotlib 3.0 (:issue:`22790`). - Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`) +- :func:`~DataFrame.to_parquet` now supports writing a ``DataFrame`` as a directory of parquet files partitioned by a subset of the columns when ``engine = 'pyarrow'`` (:issue:`23283`) - :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`) .. _whatsnew_0240.api_breaking: diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 7aadf7e735f38..8f96eb73aeb74 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -1970,7 +1970,7 @@ def to_feather(self, fname): to_feather(self, fname) def to_parquet(self, fname, engine='auto', compression='snappy', - index=None, **kwargs): + index=None, partition_cols=None, **kwargs): """ Write a DataFrame to the binary parquet format. @@ -1984,7 +1984,11 @@ def to_parquet(self, fname, engine='auto', compression='snappy', Parameters ---------- fname : str - String file path. + File path or Root Directory path. Will be used as Root Directory + path while writing a partitioned dataset. + + .. versionchanged:: 0.24.0 + engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` @@ -1999,6 +2003,12 @@ def to_parquet(self, fname, engine='auto', compression='snappy', .. versionadded:: 0.24.0 + partition_cols : list, optional, default None + Column names by which to partition the dataset + Columns are partitioned in the order they are given + + .. versionadded:: 0.24.0 + **kwargs Additional arguments passed to the parquet library. See :ref:`pandas io ` for more details. @@ -2027,7 +2037,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy', """ from pandas.io.parquet import to_parquet to_parquet(self, fname, engine, - compression=compression, index=index, **kwargs) + compression=compression, index=index, + partition_cols=partition_cols, **kwargs) @Substitution(header='Write out the column names. If a list of strings ' 'is given, it is assumed to be aliases for the ' diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 160a26533fb89..3d72b1ec3a47f 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -101,7 +101,8 @@ def __init__(self): self.api = pyarrow def write(self, df, path, compression='snappy', - coerce_timestamps='ms', index=None, **kwargs): + coerce_timestamps='ms', index=None, partition_cols=None, + **kwargs): self.validate_dataframe(df) path, _, _, _ = get_filepath_or_buffer(path, mode='wb') @@ -109,11 +110,16 @@ def write(self, df, path, compression='snappy', from_pandas_kwargs = {} else: from_pandas_kwargs = {'preserve_index': index} - table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - self.api.parquet.write_table( - table, path, compression=compression, - coerce_timestamps=coerce_timestamps, **kwargs) + if partition_cols is not None: + self.api.parquet.write_to_dataset( + table, path, compression=compression, + coerce_timestamps=coerce_timestamps, + partition_cols=partition_cols, **kwargs) + else: + self.api.parquet.write_table( + table, path, compression=compression, + coerce_timestamps=coerce_timestamps, **kwargs) def read(self, path, columns=None, **kwargs): path, _, _, should_close = get_filepath_or_buffer(path) @@ -156,12 +162,23 @@ def __init__(self): ) self.api = fastparquet - def write(self, df, path, compression='snappy', index=None, **kwargs): + def write(self, df, path, compression='snappy', index=None, + partition_cols=None, **kwargs): self.validate_dataframe(df) # thriftpy/protocol/compact.py:339: # DeprecationWarning: tostring() is deprecated. # Use tobytes() instead. + if 'partition_on' in kwargs and partition_cols is not None: + raise ValueError("Cannot use both partition_on and " + "partition_cols. Use partition_cols for " + "partitioning data") + elif 'partition_on' in kwargs: + partition_cols = kwargs.pop('partition_on') + + if partition_cols is not None: + kwargs['file_scheme'] = 'hive' + if is_s3_url(path): # path is s3:// so we need to open the s3file in 'wb' mode. # TODO: Support 'ab' @@ -174,7 +191,8 @@ def write(self, df, path, compression='snappy', index=None, **kwargs): with catch_warnings(record=True): self.api.write(path, df, compression=compression, - write_index=index, **kwargs) + write_index=index, partition_on=partition_cols, + **kwargs) def read(self, path, columns=None, **kwargs): if is_s3_url(path): @@ -194,15 +212,18 @@ def read(self, path, columns=None, **kwargs): def to_parquet(df, path, engine='auto', compression='snappy', index=None, - **kwargs): + partition_cols=None, **kwargs): """ Write a DataFrame to the parquet format. Parameters ---------- - df : DataFrame - path : string - File path + path : str + File path or Root Directory path. Will be used as Root Directory path + while writing a partitioned dataset. + + .. versionchanged:: 0.24.0 + engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` @@ -216,11 +237,19 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None, engine's default behavior will be used. .. versionadded 0.24.0 + + partition_cols : list, optional, default None + Column names by which to partition the dataset + Columns are partitioned in the order they are given + + .. versionadded:: 0.24.0 + kwargs Additional keyword arguments passed to the engine """ impl = get_engine(engine) - return impl.write(df, path, compression=compression, index=index, **kwargs) + return impl.write(df, path, compression=compression, index=index, + partition_cols=partition_cols, **kwargs) def read_parquet(path, engine='auto', columns=None, **kwargs): diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 3b3e7f757bf60..6024fccb15c76 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -1,4 +1,5 @@ """ test parquet compat """ +import os import pytest import datetime @@ -454,6 +455,18 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): check_round_trip(df_compat, pa, path='s3://pandas-test/pyarrow.parquet') + def test_partition_cols_supported(self, pa, df_full): + # GH #23283 + partition_cols = ['bool', 'int'] + df = df_full + with tm.ensure_clean_dir() as path: + df.to_parquet(path, partition_cols=partition_cols, + compression=None) + import pyarrow.parquet as pq + dataset = pq.ParquetDataset(path, validate_schema=False) + assert len(dataset.partitions.partition_names) == 2 + assert dataset.partitions.partition_names == set(partition_cols) + class TestParquetFastParquet(Base): @@ -519,3 +532,37 @@ def test_s3_roundtrip(self, df_compat, s3_resource, fp): # GH #19134 check_round_trip(df_compat, fp, path='s3://pandas-test/fastparquet.parquet') + + def test_partition_cols_supported(self, fp, df_full): + # GH #23283 + partition_cols = ['bool', 'int'] + df = df_full + with tm.ensure_clean_dir() as path: + df.to_parquet(path, engine="fastparquet", + partition_cols=partition_cols, compression=None) + assert os.path.exists(path) + import fastparquet + actual_partition_cols = fastparquet.ParquetFile(path, False).cats + assert len(actual_partition_cols) == 2 + + def test_partition_on_supported(self, fp, df_full): + # GH #23283 + partition_cols = ['bool', 'int'] + df = df_full + with tm.ensure_clean_dir() as path: + df.to_parquet(path, engine="fastparquet", compression=None, + partition_on=partition_cols) + assert os.path.exists(path) + import fastparquet + actual_partition_cols = fastparquet.ParquetFile(path, False).cats + assert len(actual_partition_cols) == 2 + + def test_error_on_using_partition_cols_and_partition_on(self, fp, df_full): + # GH #23283 + partition_cols = ['bool', 'int'] + df = df_full + with pytest.raises(ValueError): + with tm.ensure_clean_dir() as path: + df.to_parquet(path, engine="fastparquet", compression=None, + partition_on=partition_cols, + partition_cols=partition_cols) diff --git a/pandas/tests/util/test_testing.py b/pandas/tests/util/test_testing.py index d1dc91f94e3c4..c10ad72d39f8e 100644 --- a/pandas/tests/util/test_testing.py +++ b/pandas/tests/util/test_testing.py @@ -876,3 +876,10 @@ def test_datapath_missing(datapath, request): ) assert result == expected + + +def test_create_temp_directory(): + with tm.ensure_clean_dir() as path: + assert os.path.exists(path) + assert os.path.isdir(path) + assert not os.path.exists(path) diff --git a/pandas/util/testing.py b/pandas/util/testing.py index 96387349eecd7..f0dcfda2f52ad 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -6,6 +6,7 @@ import locale import os import re +from shutil import rmtree import string import subprocess import sys @@ -759,6 +760,25 @@ def ensure_clean(filename=None, return_filelike=False): print("Exception on removing file: {error}".format(error=e)) +@contextmanager +def ensure_clean_dir(): + """ + Get a temporary directory path and agrees to remove on close. + + Yields + ------ + Temporary directory path + """ + directory_name = tempfile.mkdtemp(suffix='') + try: + yield directory_name + finally: + try: + rmtree(directory_name) + except Exception: + pass + + # ----------------------------------------------------------------------------- # Comparators