Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement chunked column wise concat in chunked parquet reader #16052

Merged
merged 20 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2f83407
initial
galipremsagar Jun 11, 2024
8445e10
Merge remote-tracking branch 'upstream/branch-24.08' into chunked_concat
galipremsagar Jun 11, 2024
10a54e3
next pass
galipremsagar Jun 12, 2024
eb20ef5
Merge remote-tracking branch 'upstream/branch-24.08' into chunked_concat
galipremsagar Jun 13, 2024
1240fa2
Merge remote-tracking branch 'upstream/branch-24.08' into chunked_concat
galipremsagar Jun 13, 2024
aa8ab9a
Merge remote-tracking branch 'upstream/branch-24.08' into chunked_concat
galipremsagar Jun 17, 2024
121adee
Merge remote-tracking branch 'upstream/branch-24.08' into chunked_concat
galipremsagar Jun 17, 2024
8a124c9
clean up
galipremsagar Jun 17, 2024
0aef5be
cleanup
galipremsagar Jun 17, 2024
6c14d8d
Merge branch 'branch-24.08' into chunked_concat
galipremsagar Jun 17, 2024
298af8a
Merge remote-tracking branch 'upstream/branch-24.08' into chunked_concat
galipremsagar Jun 25, 2024
ce989c4
Apply suggestions from code review
galipremsagar Jun 25, 2024
b223239
refactor
galipremsagar Jun 25, 2024
05f8974
Merge branch 'chunked_concat' of https://github.com/galipremsagar/cud…
galipremsagar Jun 25, 2024
5c3d37d
style
galipremsagar Jun 25, 2024
1412b92
add test
galipremsagar Jun 25, 2024
7f285e8
Merge branch 'branch-24.08' into chunked_concat
galipremsagar Jun 25, 2024
dba98ee
Merge remote-tracking branch 'upstream/branch-24.08' into chunked_concat
galipremsagar Jun 26, 2024
84efa09
simplify
galipremsagar Jun 26, 2024
f617ccb
Merge branch 'branch-24.08' into chunked_concat
galipremsagar Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ from cudf.api.types import is_list_like

from cudf._lib.utils cimport data_from_unique_ptr

from cudf._lib import pylibcudf
from cudf._lib.utils import _index_level_name, generate_pandas_metadata

from libc.stdint cimport uint8_t
Expand Down Expand Up @@ -70,8 +71,11 @@ from cudf._lib.utils cimport table_view_from_table

from pyarrow.lib import NativeFile

from cudf._lib.concat import concat_columns
from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT

from cudf._lib.utils cimport data_from_pylibcudf_table


cdef class BufferArrayFromVector:
cdef Py_ssize_t length
Expand Down Expand Up @@ -878,14 +882,39 @@ cdef class ParquetReader:
return df

def read(self):
dfs = []
dfs = self._read_chunk()
while self._has_next():
dfs.append(self._read_chunk())
df = cudf.concat(dfs)
df = _process_metadata(df, self.result_meta, self.names, self.row_groups,
self.filepaths_or_buffers, self.pa_buffers,
self.allow_range_index, self.cpp_use_pandas_metadata)
return df
dfs = [dfs, self._read_chunk()]
concatenated_columns = []
column_names = dfs[0]._column_names
all_columns = []
for i in range(len(column_names)):
cols = [table._data.columns[i] for table in dfs]
all_columns.append(cols)
i = 0
del dfs
while True:
try:
columns = all_columns[0]
except IndexError:
break
if len(columns) == 0:
break
res_col = None
res_col = concat_columns(columns)
del all_columns[0]
concatenated_columns.append(res_col.to_pylibcudf(mode="read"))
dfs = cudf.DataFrame._from_data(
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
*data_from_pylibcudf_table(
pylibcudf.Table(concatenated_columns),
column_names=column_names,
index_names=None
)
)

return _process_metadata(dfs, self.result_meta, self.names, self.row_groups,
self.filepaths_or_buffers, self.pa_buffers,
self.allow_range_index, self.cpp_use_pandas_metadata)

cpdef merge_filemetadata(object filemetadata_list):
"""
Expand Down
20 changes: 14 additions & 6 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,12 +908,20 @@ def _read_parquet(
"cudf engine doesn't support the "
f"following positional arguments: {list(args)}"
)
return libparquet.read_parquet(
filepaths_or_buffers,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
)
if cudf.get_option("mode.pandas_compatible"):
return libparquet.ParquetReader(
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
filepaths_or_buffers,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
).read()
else:
return libparquet.read_parquet(
filepaths_or_buffers,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
)
else:
if (
isinstance(filepaths_or_buffers, list)
Expand Down
11 changes: 11 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3485,3 +3485,14 @@ def test_parquet_chunked_reader(
)
actual = reader.read()
assert_eq(expected, actual)


def test_parquet_reader_pandas_compatibility():
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000}
)
buffer = BytesIO()
df.to_parquet(buffer)
with cudf.option_context("mode.pandas_compatible", True):
expected = cudf.read_parquet(buffer)
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
assert_eq(expected, df)
Loading