Skip to content

Commit

Permalink
ENH: Support for partition_cols in to_parquet (pandas-dev#23321)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjsudh authored and Pingviinituutti committed Feb 28, 2019
1 parent 7d78634 commit a8f3abe
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 15 deletions.
37 changes: 37 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4673,6 +4673,43 @@ Passing ``index=True`` will *always* write the index, even if that's not the
underlying engine's default behavior.


Partitioning Parquet files
''''''''''''''''''''''''''

.. versionadded:: 0.24.0

Parquet supports partitioning of data based on the values of one or more columns.

.. ipython:: python
df = pd.DataFrame({'a': [0, 0, 1, 1], 'b': [0, 1, 0, 1]})
df.to_parquet(fname='test', engine='pyarrow', partition_cols=['a'], compression=None)
The `fname` specifies the parent directory to which data will be saved.
The `partition_cols` are the column names by which the dataset will be partitioned.
Columns are partitioned in the order they are given. The partition splits are
determined by the unique values in the partition columns.
The above example creates a partitioned dataset that may look like:

.. code-block:: text
test
├── a=0
│ ├── 0bac803e32dc42ae83fddfd029cbdebc.parquet
│ └── ...
└── a=1
├── e6ab24a4f45147b49b54a662f0c412a3.parquet
└── ...
.. ipython:: python
:suppress:
from shutil import rmtree
try:
rmtree('test')
except Exception:
pass
.. _io.sql:

SQL Queries
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.24.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ Other Enhancements
- New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`).
- Compatibility with Matplotlib 3.0 (:issue:`22790`).
- Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`)
- :func:`~DataFrame.to_parquet` now supports writing a ``DataFrame`` as a directory of parquet files partitioned by a subset of the columns when ``engine = 'pyarrow'`` (:issue:`23283`)
- :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`)

.. _whatsnew_0240.api_breaking:
Expand Down
17 changes: 14 additions & 3 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,7 @@ def to_feather(self, fname):
to_feather(self, fname)

def to_parquet(self, fname, engine='auto', compression='snappy',
index=None, **kwargs):
index=None, partition_cols=None, **kwargs):
"""
Write a DataFrame to the binary parquet format.
Expand All @@ -1984,7 +1984,11 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
Parameters
----------
fname : str
String file path.
File path or Root Directory path. Will be used as Root Directory
path while writing a partitioned dataset.
.. versionchanged:: 0.24.0
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
Expand All @@ -1999,6 +2003,12 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
.. versionadded:: 0.24.0
partition_cols : list, optional, default None
Column names by which to partition the dataset
Columns are partitioned in the order they are given
.. versionadded:: 0.24.0
**kwargs
Additional arguments passed to the parquet library. See
:ref:`pandas io <io.parquet>` for more details.
Expand Down Expand Up @@ -2027,7 +2037,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
"""
from pandas.io.parquet import to_parquet
to_parquet(self, fname, engine,
compression=compression, index=index, **kwargs)
compression=compression, index=index,
partition_cols=partition_cols, **kwargs)

@Substitution(header='Write out the column names. If a list of strings '
'is given, it is assumed to be aliases for the '
Expand Down
53 changes: 41 additions & 12 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,25 @@ def __init__(self):
self.api = pyarrow

def write(self, df, path, compression='snappy',
coerce_timestamps='ms', index=None, **kwargs):
coerce_timestamps='ms', index=None, partition_cols=None,
**kwargs):
self.validate_dataframe(df)
path, _, _, _ = get_filepath_or_buffer(path, mode='wb')

if index is None:
from_pandas_kwargs = {}
else:
from_pandas_kwargs = {'preserve_index': index}

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)
if partition_cols is not None:
self.api.parquet.write_to_dataset(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps,
partition_cols=partition_cols, **kwargs)
else:
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)

def read(self, path, columns=None, **kwargs):
path, _, _, should_close = get_filepath_or_buffer(path)
Expand Down Expand Up @@ -156,12 +162,23 @@ def __init__(self):
)
self.api = fastparquet

def write(self, df, path, compression='snappy', index=None, **kwargs):
def write(self, df, path, compression='snappy', index=None,
partition_cols=None, **kwargs):
self.validate_dataframe(df)
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.

if 'partition_on' in kwargs and partition_cols is not None:
raise ValueError("Cannot use both partition_on and "
"partition_cols. Use partition_cols for "
"partitioning data")
elif 'partition_on' in kwargs:
partition_cols = kwargs.pop('partition_on')

if partition_cols is not None:
kwargs['file_scheme'] = 'hive'

if is_s3_url(path):
# path is s3:// so we need to open the s3file in 'wb' mode.
# TODO: Support 'ab'
Expand All @@ -174,7 +191,8 @@ def write(self, df, path, compression='snappy', index=None, **kwargs):

with catch_warnings(record=True):
self.api.write(path, df, compression=compression,
write_index=index, **kwargs)
write_index=index, partition_on=partition_cols,
**kwargs)

def read(self, path, columns=None, **kwargs):
if is_s3_url(path):
Expand All @@ -194,15 +212,18 @@ def read(self, path, columns=None, **kwargs):


def to_parquet(df, path, engine='auto', compression='snappy', index=None,
**kwargs):
partition_cols=None, **kwargs):
"""
Write a DataFrame to the parquet format.
Parameters
----------
df : DataFrame
path : string
File path
path : str
File path or Root Directory path. Will be used as Root Directory path
while writing a partitioned dataset.
.. versionchanged:: 0.24.0
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
Expand All @@ -216,11 +237,19 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None,
engine's default behavior will be used.
.. versionadded 0.24.0
partition_cols : list, optional, default None
Column names by which to partition the dataset
Columns are partitioned in the order they are given
.. versionadded:: 0.24.0
kwargs
Additional keyword arguments passed to the engine
"""
impl = get_engine(engine)
return impl.write(df, path, compression=compression, index=index, **kwargs)
return impl.write(df, path, compression=compression, index=index,
partition_cols=partition_cols, **kwargs)


def read_parquet(path, engine='auto', columns=None, **kwargs):
Expand Down
47 changes: 47 additions & 0 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" test parquet compat """
import os

import pytest
import datetime
Expand Down Expand Up @@ -454,6 +455,18 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa):
check_round_trip(df_compat, pa,
path='s3://pandas-test/pyarrow.parquet')

def test_partition_cols_supported(self, pa, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, partition_cols=partition_cols,
compression=None)
import pyarrow.parquet as pq
dataset = pq.ParquetDataset(path, validate_schema=False)
assert len(dataset.partitions.partition_names) == 2
assert dataset.partitions.partition_names == set(partition_cols)


class TestParquetFastParquet(Base):

Expand Down Expand Up @@ -519,3 +532,37 @@ def test_s3_roundtrip(self, df_compat, s3_resource, fp):
# GH #19134
check_round_trip(df_compat, fp,
path='s3://pandas-test/fastparquet.parquet')

def test_partition_cols_supported(self, fp, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, engine="fastparquet",
partition_cols=partition_cols, compression=None)
assert os.path.exists(path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
assert len(actual_partition_cols) == 2

def test_partition_on_supported(self, fp, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, engine="fastparquet", compression=None,
partition_on=partition_cols)
assert os.path.exists(path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
assert len(actual_partition_cols) == 2

def test_error_on_using_partition_cols_and_partition_on(self, fp, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with pytest.raises(ValueError):
with tm.ensure_clean_dir() as path:
df.to_parquet(path, engine="fastparquet", compression=None,
partition_on=partition_cols,
partition_cols=partition_cols)
7 changes: 7 additions & 0 deletions pandas/tests/util/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,3 +876,10 @@ def test_datapath_missing(datapath, request):
)

assert result == expected


def test_create_temp_directory():
with tm.ensure_clean_dir() as path:
assert os.path.exists(path)
assert os.path.isdir(path)
assert not os.path.exists(path)
20 changes: 20 additions & 0 deletions pandas/util/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import locale
import os
import re
from shutil import rmtree
import string
import subprocess
import sys
Expand Down Expand Up @@ -761,6 +762,25 @@ def ensure_clean(filename=None, return_filelike=False):
print("Exception on removing file: {error}".format(error=e))


@contextmanager
def ensure_clean_dir():
"""
Get a temporary directory path and agrees to remove on close.
Yields
------
Temporary directory path
"""
directory_name = tempfile.mkdtemp(suffix='')
try:
yield directory_name
finally:
try:
rmtree(directory_name)
except Exception:
pass


# -----------------------------------------------------------------------------
# Comparators

Expand Down

0 comments on commit a8f3abe

Please sign in to comment.