Skip to content

Commit

Permalink
Implement chunked column wise concat in chunked parquet reader (rapid…
Browse files Browse the repository at this point in the history
…sai#16052)

This PR implements column wise concat in chunked parquet reader which prevents over-utilizing memory for the regular concat.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Matthew Roeschke (https://github.com/mroeschke)

URL: rapidsai#16052
  • Loading branch information
galipremsagar authored Jun 26, 2024
1 parent f1efa40 commit e7cf69d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 13 deletions.
36 changes: 29 additions & 7 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ from cudf.api.types import is_list_like

from cudf._lib.utils cimport data_from_unique_ptr

from cudf._lib import pylibcudf
from cudf._lib.utils import _index_level_name, generate_pandas_metadata

from libc.stdint cimport uint8_t
Expand Down Expand Up @@ -70,8 +71,11 @@ from cudf._lib.utils cimport table_view_from_table

from pyarrow.lib import NativeFile

from cudf._lib.concat import concat_columns
from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT

from cudf._lib.utils cimport data_from_pylibcudf_table


cdef class BufferArrayFromVector:
cdef Py_ssize_t length
Expand Down Expand Up @@ -878,14 +882,32 @@ cdef class ParquetReader:
return df

def read(self):
dfs = []
dfs = self._read_chunk()
column_names = dfs._column_names
concatenated_columns = list(dfs._columns)
del dfs
while self._has_next():
dfs.append(self._read_chunk())
df = cudf.concat(dfs)
df = _process_metadata(df, self.result_meta, self.names, self.row_groups,
self.filepaths_or_buffers, self.pa_buffers,
self.allow_range_index, self.cpp_use_pandas_metadata)
return df
new_chunk = list(self._read_chunk()._columns)
for i in range(len(column_names)):
concatenated_columns[i] = concat_columns(
[concatenated_columns[i], new_chunk[i]]
)
# Must drop any residual GPU columns to save memory
new_chunk[i] = None

dfs = cudf.DataFrame._from_data(
*data_from_pylibcudf_table(
pylibcudf.Table(
[col.to_pylibcudf(mode="read") for col in concatenated_columns]
),
column_names=column_names,
index_names=None
)
)

return _process_metadata(dfs, self.result_meta, self.names, self.row_groups,
self.filepaths_or_buffers, self.pa_buffers,
self.allow_range_index, self.cpp_use_pandas_metadata)

cpdef merge_filemetadata(object filemetadata_list):
"""
Expand Down
20 changes: 14 additions & 6 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,12 +908,20 @@ def _read_parquet(
"cudf engine doesn't support the "
f"following positional arguments: {list(args)}"
)
return libparquet.read_parquet(
filepaths_or_buffers,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
)
if cudf.get_option("mode.pandas_compatible"):
return libparquet.ParquetReader(
filepaths_or_buffers,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
).read()
else:
return libparquet.read_parquet(
filepaths_or_buffers,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
)
else:
if (
isinstance(filepaths_or_buffers, list)
Expand Down
11 changes: 11 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3485,3 +3485,14 @@ def test_parquet_chunked_reader(
)
actual = reader.read()
assert_eq(expected, actual)


def test_parquet_reader_pandas_compatibility():
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000}
)
buffer = BytesIO()
df.to_parquet(buffer)
with cudf.option_context("mode.pandas_compatible", True):
expected = cudf.read_parquet(buffer)
assert_eq(expected, df)

0 comments on commit e7cf69d

Please sign in to comment.