diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 7914ed7e9d9..d1ec5be9e62 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -20,6 +20,7 @@ from cudf.api.types import is_list_like from cudf._lib.utils cimport data_from_unique_ptr +from cudf._lib import pylibcudf from cudf._lib.utils import _index_level_name, generate_pandas_metadata from libc.stdint cimport uint8_t @@ -70,8 +71,11 @@ from cudf._lib.utils cimport table_view_from_table from pyarrow.lib import NativeFile +from cudf._lib.concat import concat_columns from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT +from cudf._lib.utils cimport data_from_pylibcudf_table + cdef class BufferArrayFromVector: cdef Py_ssize_t length @@ -878,14 +882,32 @@ cdef class ParquetReader: return df def read(self): - dfs = [] + dfs = self._read_chunk() + column_names = dfs._column_names + concatenated_columns = list(dfs._columns) + del dfs while self._has_next(): - dfs.append(self._read_chunk()) - df = cudf.concat(dfs) - df = _process_metadata(df, self.result_meta, self.names, self.row_groups, - self.filepaths_or_buffers, self.pa_buffers, - self.allow_range_index, self.cpp_use_pandas_metadata) - return df + new_chunk = list(self._read_chunk()._columns) + for i in range(len(column_names)): + concatenated_columns[i] = concat_columns( + [concatenated_columns[i], new_chunk[i]] + ) + # Must drop any residual GPU columns to save memory + new_chunk[i] = None + + dfs = cudf.DataFrame._from_data( + *data_from_pylibcudf_table( + pylibcudf.Table( + [col.to_pylibcudf(mode="read") for col in concatenated_columns] + ), + column_names=column_names, + index_names=None + ) + ) + + return _process_metadata(dfs, self.result_meta, self.names, self.row_groups, + self.filepaths_or_buffers, self.pa_buffers, + self.allow_range_index, self.cpp_use_pandas_metadata) cpdef merge_filemetadata(object filemetadata_list): """ diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 58b104b84e9..2a838ca7417 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -908,12 +908,20 @@ def _read_parquet( "cudf engine doesn't support the " f"following positional arguments: {list(args)}" ) - return libparquet.read_parquet( - filepaths_or_buffers, - columns=columns, - row_groups=row_groups, - use_pandas_metadata=use_pandas_metadata, - ) + if cudf.get_option("mode.pandas_compatible"): + return libparquet.ParquetReader( + filepaths_or_buffers, + columns=columns, + row_groups=row_groups, + use_pandas_metadata=use_pandas_metadata, + ).read() + else: + return libparquet.read_parquet( + filepaths_or_buffers, + columns=columns, + row_groups=row_groups, + use_pandas_metadata=use_pandas_metadata, + ) else: if ( isinstance(filepaths_or_buffers, list) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e1e7952605b..588bc87d268 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3485,3 +3485,14 @@ def test_parquet_chunked_reader( ) actual = reader.read() assert_eq(expected, actual) + + +def test_parquet_reader_pandas_compatibility(): + df = pd.DataFrame( + {"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000} + ) + buffer = BytesIO() + df.to_parquet(buffer) + with cudf.option_context("mode.pandas_compatible", True): + expected = cudf.read_parquet(buffer) + assert_eq(expected, df)