From 2f83407b1f4566c791367447140020c16973d54a Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 11 Jun 2024 21:54:20 +0000 Subject: [PATCH 1/9] initial --- python/cudf/cudf/_lib/concat.pyx | 63 ++++++++++++++++++++++++++----- python/cudf/cudf/_lib/parquet.pyx | 11 ++++-- python/cudf/cudf/io/parquet.py | 9 ++++- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/python/cudf/cudf/_lib/concat.pyx b/python/cudf/cudf/_lib/concat.pyx index 89ddcfee99e..470686a0d6b 100644 --- a/python/cudf/cudf/_lib/concat.pyx +++ b/python/cudf/cudf/_lib/concat.pyx @@ -7,6 +7,7 @@ from cudf._lib.utils cimport data_from_pylibcudf_table from cudf._lib import pylibcudf from cudf.core.buffer import acquire_spill_lock +import cudf @acquire_spill_lock() @@ -20,15 +21,57 @@ def concat_columns(object columns): @acquire_spill_lock() def concat_tables(object tables, bool ignore_index=False): - plc_tables = [] - for table in tables: - cols = table._data.columns + if cudf.get_option("mode.pandas_compatible"): + if not tables: + return None + + # Get the column names and index names from the first table + column_names = tables[0]._column_names + index_names = None if ignore_index else tables[0]._index_names + + # Concatenate each column separately + concatenated_columns = [] + print(column_names) + for i in range(len(column_names)): + columns = [table._data.columns[i] for table in tables] + res_col = None + #for table in tables: + # if res_col is None: + # res_col = table._data.columns[i] + # else: + # res_col = concat_columns([res_col, table._data.columns[i]]) + res_col = concat_columns(columns) + print("43") + del columns + for table in tables: + #table._data[column_names[i]] = table._data[column_names[i]].slice(0, 0) + pass + concatenated_columns.append(res_col.to_pylibcudf(mode="read")) + + # Concatenate index columns if not ignoring the index if not ignore_index: - cols = table._index._data.columns + cols - plc_tables.append(pylibcudf.Table([c.to_pylibcudf(mode="read") for c in cols])) + index_columns = [] + for i in range(len(index_names)): + columns = [table._index._data.columns[i] for table in tables] + index_columns.append(concat_columns(columns).to_pylibcudf(mode="read")) + concatenated_columns = index_columns + concatenated_columns - return data_from_pylibcudf_table( - pylibcudf.concatenate.concatenate(plc_tables), - column_names=tables[0]._column_names, - index_names=None if ignore_index else tables[0]._index_names - ) + # Create a new table from the concatenated columns + return data_from_pylibcudf_table( + pylibcudf.Table(concatenated_columns), + column_names=column_names, + index_names=index_names + ) + else: + plc_tables = [] + for table in tables: + cols = table._data.columns + if not ignore_index: + cols = table._index._data.columns + cols + plc_tables.append(pylibcudf.Table([c.to_pylibcudf(mode="read") for c in cols])) + + return data_from_pylibcudf_table( + pylibcudf.concatenate.concatenate(plc_tables), + column_names=tables[0]._column_names, + index_names=None if ignore_index else tables[0]._index_names + ) \ No newline at end of file diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index f6f9cfa9a7c..bb0d23b8ad5 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -878,10 +878,15 @@ cdef class ParquetReader: return df def read(self): - dfs = [] + dfs = None while self._has_next(): - dfs.append(self._read_chunk()) - df = cudf.concat(dfs) + if dfs is None: + dfs = self._read_chunk() + else: + dfs = cudf.concat([dfs, self._read_chunk()]) + #dfs.append(self._read_chunk()) + #df = cudf.concat(dfs) + df = dfs df = _process_metadata(df, self.result_meta, self.names, self.row_groups, self.filepaths_or_buffers, self.pa_buffers, self.allow_range_index, self.cpp_use_pandas_metadata) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index dbdb2093b72..a007ed3bc0d 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -908,12 +908,19 @@ def _read_parquet( "cudf engine doesn't support the " f"following positional arguments: {list(args)}" ) - return libparquet.read_parquet( + x = libparquet.ParquetReader( filepaths_or_buffers, columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, ) + return x.read() + # return libparquet.read_parquet( + # filepaths_or_buffers, + # columns=columns, + # row_groups=row_groups, + # use_pandas_metadata=use_pandas_metadata, + # ) else: if ( isinstance(filepaths_or_buffers, list) From 10a54e3ce84326d8900d23164f81e6e66d0defb9 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Wed, 12 Jun 2024 12:32:09 +0000 Subject: [PATCH 2/9] next pass --- python/cudf/cudf/_lib/parquet.pyx | 37 ++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index bb0d23b8ad5..5fca43f6fae 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -21,7 +21,7 @@ from cudf.api.types import is_list_like from cudf._lib.utils cimport data_from_unique_ptr from cudf._lib.utils import _index_level_name, generate_pandas_metadata - +from cudf._lib import pylibcudf from libc.stdint cimport uint8_t from libcpp cimport bool from libcpp.map cimport map @@ -71,7 +71,8 @@ from cudf._lib.utils cimport table_view_from_table from pyarrow.lib import NativeFile from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT - +from cudf._lib.concat import concat_columns +from cudf._lib.utils cimport data_from_pylibcudf_table cdef class BufferArrayFromVector: cdef Py_ssize_t length @@ -883,7 +884,37 @@ cdef class ParquetReader: if dfs is None: dfs = self._read_chunk() else: - dfs = cudf.concat([dfs, self._read_chunk()]) + dfs = [dfs, self._read_chunk()] + concatenated_columns = [] + column_names = dfs[0]._column_names + all_columns = [] + for i in range(len(column_names)): + cols = [table._data.columns[i] for table in dfs] + all_columns.append(cols) + i = 0 + del dfs + while True: + try: + columns = all_columns[0] + except IndexError: + break + if len(columns) == 0: + break + res_col = None + #for table in tables: + # if res_col is None: + # res_col = table._data.columns[i] + # else: + # res_col = concat_columns([res_col, table._data.columns[i]]) + res_col = concat_columns(columns) + del all_columns[0] + concatenated_columns.append(res_col.to_pylibcudf(mode="read")) + dfs = cudf.DataFrame._from_data( + *data_from_pylibcudf_table( + pylibcudf.Table(concatenated_columns), + column_names=column_names, + index_names=None + )) #dfs.append(self._read_chunk()) #df = cudf.concat(dfs) df = dfs From 8a124c9e1bf4efcb1535cd6e9e043f6f7f310a95 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 17 Jun 2024 22:21:17 +0000 Subject: [PATCH 3/9] clean up --- python/cudf/cudf/_lib/concat.pyx | 63 +++++-------------------------- python/cudf/cudf/_lib/parquet.pyx | 24 ++++++------ python/cudf/cudf/io/parquet.py | 27 ++++++------- 3 files changed, 35 insertions(+), 79 deletions(-) diff --git a/python/cudf/cudf/_lib/concat.pyx b/python/cudf/cudf/_lib/concat.pyx index 470686a0d6b..89ddcfee99e 100644 --- a/python/cudf/cudf/_lib/concat.pyx +++ b/python/cudf/cudf/_lib/concat.pyx @@ -7,7 +7,6 @@ from cudf._lib.utils cimport data_from_pylibcudf_table from cudf._lib import pylibcudf from cudf.core.buffer import acquire_spill_lock -import cudf @acquire_spill_lock() @@ -21,57 +20,15 @@ def concat_columns(object columns): @acquire_spill_lock() def concat_tables(object tables, bool ignore_index=False): - if cudf.get_option("mode.pandas_compatible"): - if not tables: - return None - - # Get the column names and index names from the first table - column_names = tables[0]._column_names - index_names = None if ignore_index else tables[0]._index_names - - # Concatenate each column separately - concatenated_columns = [] - print(column_names) - for i in range(len(column_names)): - columns = [table._data.columns[i] for table in tables] - res_col = None - #for table in tables: - # if res_col is None: - # res_col = table._data.columns[i] - # else: - # res_col = concat_columns([res_col, table._data.columns[i]]) - res_col = concat_columns(columns) - print("43") - del columns - for table in tables: - #table._data[column_names[i]] = table._data[column_names[i]].slice(0, 0) - pass - concatenated_columns.append(res_col.to_pylibcudf(mode="read")) - - # Concatenate index columns if not ignoring the index + plc_tables = [] + for table in tables: + cols = table._data.columns if not ignore_index: - index_columns = [] - for i in range(len(index_names)): - columns = [table._index._data.columns[i] for table in tables] - index_columns.append(concat_columns(columns).to_pylibcudf(mode="read")) - concatenated_columns = index_columns + concatenated_columns + cols = table._index._data.columns + cols + plc_tables.append(pylibcudf.Table([c.to_pylibcudf(mode="read") for c in cols])) - # Create a new table from the concatenated columns - return data_from_pylibcudf_table( - pylibcudf.Table(concatenated_columns), - column_names=column_names, - index_names=index_names - ) - else: - plc_tables = [] - for table in tables: - cols = table._data.columns - if not ignore_index: - cols = table._index._data.columns + cols - plc_tables.append(pylibcudf.Table([c.to_pylibcudf(mode="read") for c in cols])) - - return data_from_pylibcudf_table( - pylibcudf.concatenate.concatenate(plc_tables), - column_names=tables[0]._column_names, - index_names=None if ignore_index else tables[0]._index_names - ) \ No newline at end of file + return data_from_pylibcudf_table( + pylibcudf.concatenate.concatenate(plc_tables), + column_names=tables[0]._column_names, + index_names=None if ignore_index else tables[0]._index_names + ) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 5fca43f6fae..7a8a204f11e 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -20,8 +20,9 @@ from cudf.api.types import is_list_like from cudf._lib.utils cimport data_from_unique_ptr -from cudf._lib.utils import _index_level_name, generate_pandas_metadata from cudf._lib import pylibcudf +from cudf._lib.utils import _index_level_name, generate_pandas_metadata + from libc.stdint cimport uint8_t from libcpp cimport bool from libcpp.map cimport map @@ -70,10 +71,12 @@ from cudf._lib.utils cimport table_view_from_table from pyarrow.lib import NativeFile -from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT from cudf._lib.concat import concat_columns +from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT + from cudf._lib.utils cimport data_from_pylibcudf_table + cdef class BufferArrayFromVector: cdef Py_ssize_t length cdef unique_ptr[vector[uint8_t]] in_vec @@ -901,22 +904,17 @@ cdef class ParquetReader: if len(columns) == 0: break res_col = None - #for table in tables: - # if res_col is None: - # res_col = table._data.columns[i] - # else: - # res_col = concat_columns([res_col, table._data.columns[i]]) res_col = concat_columns(columns) del all_columns[0] concatenated_columns.append(res_col.to_pylibcudf(mode="read")) dfs = cudf.DataFrame._from_data( *data_from_pylibcudf_table( - pylibcudf.Table(concatenated_columns), - column_names=column_names, - index_names=None - )) - #dfs.append(self._read_chunk()) - #df = cudf.concat(dfs) + pylibcudf.Table(concatenated_columns), + column_names=column_names, + index_names=None + ) + ) + df = dfs df = _process_metadata(df, self.result_meta, self.names, self.row_groups, self.filepaths_or_buffers, self.pa_buffers, diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 79afc75eb37..2a838ca7417 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -908,19 +908,20 @@ def _read_parquet( "cudf engine doesn't support the " f"following positional arguments: {list(args)}" ) - x = libparquet.ParquetReader( - filepaths_or_buffers, - columns=columns, - row_groups=row_groups, - use_pandas_metadata=use_pandas_metadata, - ) - return x.read() - # return libparquet.read_parquet( - # filepaths_or_buffers, - # columns=columns, - # row_groups=row_groups, - # use_pandas_metadata=use_pandas_metadata, - # ) + if cudf.get_option("mode.pandas_compatible"): + return libparquet.ParquetReader( + filepaths_or_buffers, + columns=columns, + row_groups=row_groups, + use_pandas_metadata=use_pandas_metadata, + ).read() + else: + return libparquet.read_parquet( + filepaths_or_buffers, + columns=columns, + row_groups=row_groups, + use_pandas_metadata=use_pandas_metadata, + ) else: if ( isinstance(filepaths_or_buffers, list) From 0aef5be60471818d7b49a99a5ffcf409fc52a363 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 17 Jun 2024 22:22:33 +0000 Subject: [PATCH 4/9] cleanup --- python/cudf/cudf/_lib/parquet.pyx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 7a8a204f11e..8582f29afc0 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -915,8 +915,7 @@ cdef class ParquetReader: ) ) - df = dfs - df = _process_metadata(df, self.result_meta, self.names, self.row_groups, + df = _process_metadata(dfs, self.result_meta, self.names, self.row_groups, self.filepaths_or_buffers, self.pa_buffers, self.allow_range_index, self.cpp_use_pandas_metadata) return df From ce989c497ec1d7322b34d15cf3122acaedd1cb8e Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Tue, 25 Jun 2024 11:42:25 -0500 Subject: [PATCH 5/9] Apply suggestions from code review Co-authored-by: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> --- python/cudf/cudf/_lib/parquet.pyx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 8582f29afc0..b8df53ca415 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -915,10 +915,9 @@ cdef class ParquetReader: ) ) - df = _process_metadata(dfs, self.result_meta, self.names, self.row_groups, + return _process_metadata(dfs, self.result_meta, self.names, self.row_groups, self.filepaths_or_buffers, self.pa_buffers, self.allow_range_index, self.cpp_use_pandas_metadata) - return df cpdef merge_filemetadata(object filemetadata_list): """ From b223239036e7927a2a59cb27013b2b95f13dc292 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 25 Jun 2024 17:21:29 +0000 Subject: [PATCH 6/9] refactor --- python/cudf/cudf/_lib/parquet.pyx | 57 +++++++++++++++---------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 8582f29afc0..cca6d4757e5 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -882,38 +882,35 @@ cdef class ParquetReader: return df def read(self): - dfs = None + dfs = self._read_chunk() while self._has_next(): - if dfs is None: - dfs = self._read_chunk() - else: - dfs = [dfs, self._read_chunk()] - concatenated_columns = [] - column_names = dfs[0]._column_names - all_columns = [] - for i in range(len(column_names)): - cols = [table._data.columns[i] for table in dfs] - all_columns.append(cols) - i = 0 - del dfs - while True: - try: - columns = all_columns[0] - except IndexError: - break - if len(columns) == 0: - break - res_col = None - res_col = concat_columns(columns) - del all_columns[0] - concatenated_columns.append(res_col.to_pylibcudf(mode="read")) - dfs = cudf.DataFrame._from_data( - *data_from_pylibcudf_table( - pylibcudf.Table(concatenated_columns), - column_names=column_names, - index_names=None - ) + dfs = [dfs, self._read_chunk()] + concatenated_columns = [] + column_names = dfs[0]._column_names + all_columns = [] + for i in range(len(column_names)): + cols = [table._data.columns[i] for table in dfs] + all_columns.append(cols) + i = 0 + del dfs + while True: + try: + columns = all_columns[0] + except IndexError: + break + if len(columns) == 0: + break + res_col = None + res_col = concat_columns(columns) + del all_columns[0] + concatenated_columns.append(res_col.to_pylibcudf(mode="read")) + dfs = cudf.DataFrame._from_data( + *data_from_pylibcudf_table( + pylibcudf.Table(concatenated_columns), + column_names=column_names, + index_names=None ) + ) df = _process_metadata(dfs, self.result_meta, self.names, self.row_groups, self.filepaths_or_buffers, self.pa_buffers, From 5c3d37dfcd764c5bcd38b1df769414fd0baa8f82 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 25 Jun 2024 17:24:58 +0000 Subject: [PATCH 7/9] style --- python/cudf/cudf/_lib/parquet.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 8190db6b995..23f904f24b1 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -913,8 +913,8 @@ cdef class ParquetReader: ) return _process_metadata(dfs, self.result_meta, self.names, self.row_groups, - self.filepaths_or_buffers, self.pa_buffers, - self.allow_range_index, self.cpp_use_pandas_metadata) + self.filepaths_or_buffers, self.pa_buffers, + self.allow_range_index, self.cpp_use_pandas_metadata) cpdef merge_filemetadata(object filemetadata_list): """ From 1412b9285f214b8dbcadeee5ee16b4dd8a9f036f Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 25 Jun 2024 17:33:44 +0000 Subject: [PATCH 8/9] add test --- python/cudf/cudf/tests/test_parquet.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e1e7952605b..588bc87d268 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3485,3 +3485,14 @@ def test_parquet_chunked_reader( ) actual = reader.read() assert_eq(expected, actual) + + +def test_parquet_reader_pandas_compatibility(): + df = pd.DataFrame( + {"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000} + ) + buffer = BytesIO() + df.to_parquet(buffer) + with cudf.option_context("mode.pandas_compatible", True): + expected = cudf.read_parquet(buffer) + assert_eq(expected, df) From 84efa09a91ced06dd2cbe3c2c2a8b0ab876b4f69 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Wed, 26 Jun 2024 15:27:09 +0000 Subject: [PATCH 9/9] simplify --- python/cudf/cudf/_lib/parquet.pyx | 41 +++++++++++++------------------ 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 458f4c33201..d1ec5be9e62 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -883,32 +883,25 @@ cdef class ParquetReader: def read(self): dfs = self._read_chunk() + column_names = dfs._column_names + concatenated_columns = list(dfs._columns) + del dfs while self._has_next(): - dfs = [dfs, self._read_chunk()] - concatenated_columns = [] - column_names = dfs[0]._column_names - all_columns = [] + new_chunk = list(self._read_chunk()._columns) for i in range(len(column_names)): - cols = [table._data.columns[i] for table in dfs] - all_columns.append(cols) - i = 0 - del dfs - while True: - try: - columns = all_columns[0] - except IndexError: - break - if len(columns) == 0: - break - res_col = None - res_col = concat_columns(columns) - del all_columns[0] - concatenated_columns.append(res_col.to_pylibcudf(mode="read")) - dfs = cudf.DataFrame._from_data( - *data_from_pylibcudf_table( - pylibcudf.Table(concatenated_columns), - column_names=column_names, - index_names=None + concatenated_columns[i] = concat_columns( + [concatenated_columns[i], new_chunk[i]] + ) + # Must drop any residual GPU columns to save memory + new_chunk[i] = None + + dfs = cudf.DataFrame._from_data( + *data_from_pylibcudf_table( + pylibcudf.Table( + [col.to_pylibcudf(mode="read") for col in concatenated_columns] + ), + column_names=column_names, + index_names=None ) )