From cbeefd8f4e4e67f52331131039533ef1f0ea0a65 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:54:54 -0500 Subject: [PATCH] Add Parquet Reader options classes to pylibcudf (#17464) Follow up of #17263, this PR adds the parquet reader options classes to pylibcudf and plumbs the changes through cudf python. Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Matthew Roeschke (https://github.com/mroeschke) - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) - MithunR (https://github.com/mythrocks) URL: https://github.com/rapidsai/cudf/pull/17464 --- cpp/include/cudf/io/parquet.hpp | 1 + python/cudf/cudf/_lib/parquet.pyx | 58 +-- python/cudf_polars/cudf_polars/dsl/ir.py | 44 ++- python/pylibcudf/pylibcudf/io/parquet.pxd | 36 +- python/pylibcudf/pylibcudf/io/parquet.pyi | 21 +- python/pylibcudf/pylibcudf/io/parquet.pyx | 339 +++++++++++------- .../pylibcudf/tests/io/test_parquet.py | 28 +- 7 files changed, 333 insertions(+), 194 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index bfe76d5690c..b561d0989e9 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -410,6 +410,7 @@ class parquet_reader_options_builder { * * @param val Boolean value whether to read matching projected and filter columns from mismatched * Parquet sources. + * * @return this for chaining. */ parquet_reader_options_builder& allow_mismatched_pq_schemas(bool val) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index c77c9875342..1b4c18d13a7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -205,7 +205,7 @@ cdef object _process_metadata(object df, else: start = range_index_meta["start"] + skip_rows stop = range_index_meta["stop"] - if nrows != -1: + if nrows > -1: stop = start + nrows idx = cudf.RangeIndex( start=start, @@ -256,16 +256,27 @@ def read_parquet_chunked( # (see read_parquet) allow_range_index = columns is not None and len(columns) != 0 + options = ( + plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo(filepaths_or_buffers) + ) + .use_pandas_metadata(use_pandas_metadata) + .allow_mismatched_pq_schemas(allow_mismatched_pq_schemas) + .build() + ) + if row_groups is not None: + options.set_row_groups(row_groups) + if nrows > -1: + options.set_num_rows(nrows) + if skip_rows != 0: + options.set_skip_rows(skip_rows) + if columns is not None: + options.set_columns(columns) + reader = ChunkedParquetReader( - plc.io.SourceInfo(filepaths_or_buffers), - columns, - row_groups, - use_pandas_metadata=use_pandas_metadata, + options, chunk_read_limit=chunk_read_limit, pass_read_limit=pass_read_limit, - skip_rows=skip_rows, - nrows=nrows, - allow_mismatched_pq_schemas=allow_mismatched_pq_schemas, ) tbl_w_meta = reader.read_chunk() @@ -325,19 +336,26 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if columns is not None and len(columns) == 0 or filters: allow_range_index = False - # Read Parquet - - tbl_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo(filepaths_or_buffers), - columns, - row_groups, - filters, - convert_strings_to_categories = False, - use_pandas_metadata = use_pandas_metadata, - skip_rows = skip_rows, - nrows = nrows, - allow_mismatched_pq_schemas=allow_mismatched_pq_schemas, + options = ( + plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo(filepaths_or_buffers) + ) + .use_pandas_metadata(use_pandas_metadata) + .allow_mismatched_pq_schemas(allow_mismatched_pq_schemas) + .build() ) + if row_groups is not None: + options.set_row_groups(row_groups) + if nrows > -1: + options.set_num_rows(nrows) + if skip_rows != 0: + options.set_skip_rows(skip_rows) + if columns is not None: + options.set_columns(columns) + if filters is not None: + options.set_filter(filters) + + tbl_w_meta = plc.io.parquet.read_parquet(options) df = cudf.DataFrame._from_data( *data_from_pylibcudf_io(tbl_w_meta) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 1faa778ccf6..b5af3bb80bf 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -517,17 +517,22 @@ def do_evaluate( elif typ == "parquet": parquet_options = config_options.get("parquet_options", {}) if parquet_options.get("chunked", True): + options = plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo(paths) + ).build() + # We handle skip_rows != 0 by reading from the + # up to n_rows + skip_rows and slicing off the + # first skip_rows entries. + # TODO: Remove this workaround once + # https://github.com/rapidsai/cudf/issues/16186 + # is fixed + nrows = n_rows + skip_rows + if nrows > -1: + options.set_num_rows(nrows) + if with_columns is not None: + options.set_columns(with_columns) reader = plc.io.parquet.ChunkedParquetReader( - plc.io.SourceInfo(paths), - columns=with_columns, - # We handle skip_rows != 0 by reading from the - # up to n_rows + skip_rows and slicing off the - # first skip_rows entries. - # TODO: Remove this workaround once - # https://github.com/rapidsai/cudf/issues/16186 - # is fixed - nrows=n_rows + skip_rows, - skip_rows=0, + options, chunk_read_limit=parquet_options.get( "chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE ), @@ -573,13 +578,18 @@ def slice_skip(tbl: plc.Table): if predicate is not None and row_index is None: # Can't apply filters during read if we have a row index. filters = to_parquet_filter(predicate.value) - tbl_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo(paths), - columns=with_columns, - filters=filters, - nrows=n_rows, - skip_rows=skip_rows, - ) + options = plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo(paths) + ).build() + if n_rows != -1: + options.set_num_rows(n_rows) + if skip_rows != 0: + options.set_skip_rows(skip_rows) + if with_columns is not None: + options.set_columns(with_columns) + if filters is not None: + options.set_filter(filters) + tbl_w_meta = plc.io.parquet.read_parquet(options) df = DataFrame.from_table( tbl_w_meta.tbl, # TODO: consider nested column names? diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 7bd6ba91ca9..84f47cf5305 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -19,6 +19,8 @@ from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, parquet_writer_options, parquet_writer_options_builder, + parquet_reader_options, + parquet_reader_options_builder, chunked_parquet_writer_options, chunked_parquet_writer_options_builder, ) @@ -27,6 +29,25 @@ from pylibcudf.table cimport Table from pylibcudf.types cimport DataType +cdef class ParquetReaderOptions: + cdef parquet_reader_options c_obj + cdef SourceInfo source + cpdef void set_row_groups(self, list row_groups) + cpdef void set_num_rows(self, size_type nrows) + cpdef void set_skip_rows(self, int64_t skip_rows) + cpdef void set_columns(self, list col_names) + cpdef void set_filter(self, Expression filter) + +cdef class ParquetReaderOptionsBuilder: + cdef parquet_reader_options_builder c_obj + cdef SourceInfo source + cpdef ParquetReaderOptionsBuilder convert_strings_to_categories(self, bool val) + cpdef ParquetReaderOptionsBuilder use_pandas_metadata(self, bool val) + cpdef ParquetReaderOptionsBuilder allow_mismatched_pq_schemas(self, bool val) + cpdef ParquetReaderOptionsBuilder use_arrow_schema(self, bool val) + cpdef build(self) + + cdef class ChunkedParquetReader: cdef unique_ptr[cpp_chunked_parquet_reader] reader @@ -34,20 +55,7 @@ cdef class ChunkedParquetReader: cpdef TableWithMetadata read_chunk(self) -cpdef read_parquet( - SourceInfo source_info, - list columns = *, - list row_groups = *, - Expression filters = *, - bool convert_strings_to_categories = *, - bool use_pandas_metadata = *, - int64_t skip_rows = *, - size_type nrows = *, - bool allow_mismatched_pq_schemas = *, - # disabled see comment in parquet.pyx for more - # ReaderColumnSchema reader_column_schema = *, - # DataType timestamp_type = * -) +cpdef read_parquet(ParquetReaderOptions options) cdef class ParquetChunkedWriter: diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index 22bea1abd8e..2d8d12c1a45 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -1,7 +1,8 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from collections.abc import Mapping -from typing import Self + +from typing_extensions import Self from pylibcudf.expressions import Expression from pylibcudf.io.types import ( @@ -16,6 +17,24 @@ from pylibcudf.io.types import ( ) from pylibcudf.table import Table +class ParquetReaderOptions: + def __init__(self): ... + def set_row_groups(self, row_groups: list[list[int]]): ... + def set_num_rows(self, nrows: int): ... + def set_skip_rows(self, skip_rows: int): ... + def set_columns(self, col_names: list[str]): ... + def set_filter(self, filter: Expression): ... + @staticmethod + def builder(source: SourceInfo) -> ParquetReaderOptionsBuilder: ... + +class ParquetReaderOptionsBuilder: + def __init__(self): ... + def convert_strings_to_categories(self, val: bool) -> Self: ... + def use_pandas_metadata(self, val: bool) -> Self: ... + def allow_mismatched_pq_schemas(self, val: bool) -> Self: ... + def use_arrow_schema(self, val: bool) -> Self: ... + def build(self) -> ParquetReaderOptions: ... + class ChunkedParquetReader: def __init__( self, diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 9bdf849a30c..672fe2be847 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -42,47 +42,204 @@ __all__ = [ "ParquetWriterOptionsBuilder", "read_parquet", "write_parquet", + "ParquetReaderOptions", + "ParquetReaderOptionsBuilder", "ChunkedParquetWriterOptions", "ChunkedParquetWriterOptionsBuilder" "merge_row_group_metadata", ] -cdef parquet_reader_options _setup_parquet_reader_options( - SourceInfo source_info, - list columns = None, - list row_groups = None, - Expression filters = None, - bool convert_strings_to_categories = False, - bool use_pandas_metadata = True, - int64_t skip_rows = 0, - size_type nrows = -1, - bool allow_mismatched_pq_schemas=False, - # ReaderColumnSchema reader_column_schema = None, - # DataType timestamp_type = DataType(type_id.EMPTY) -): - cdef vector[string] col_vec - cdef parquet_reader_options opts = ( - parquet_reader_options.builder(source_info.c_obj) - .convert_strings_to_categories(convert_strings_to_categories) - .use_pandas_metadata(use_pandas_metadata) - .allow_mismatched_pq_schemas(allow_mismatched_pq_schemas) - .use_arrow_schema(True) - .build() - ) - if row_groups is not None: - opts.set_row_groups(row_groups) - if nrows != -1: - opts.set_num_rows(nrows) - if skip_rows != 0: - opts.set_skip_rows(skip_rows) - if columns is not None: - col_vec.reserve(len(columns)) - for col in columns: - col_vec.push_back(str(col).encode()) - opts.set_columns(col_vec) - if filters is not None: - opts.set_filter(dereference(filters.c_obj.get())) - return opts + +cdef class ParquetReaderOptions: + """The settings to use for ``read_parquet`` + For details, see :cpp:class:`cudf::io::parquet_reader_options` + """ + @staticmethod + def builder(SourceInfo source): + """ + Create a ParquetReaderOptionsBuilder object + + For details, see :cpp:func:`cudf::io::parquet_reader_options::builder` + + Parameters + ---------- + sink : SourceInfo + The source to read the Parquet file from. + + Returns + ------- + ParquetReaderOptionsBuilder + Builder to build ParquetReaderOptions + """ + cdef ParquetReaderOptionsBuilder parquet_builder = ( + ParquetReaderOptionsBuilder.__new__(ParquetReaderOptionsBuilder) + ) + parquet_builder.c_obj = parquet_reader_options.builder(source.c_obj) + parquet_builder.source = source + return parquet_builder + + cpdef void set_row_groups(self, list row_groups): + """ + Sets list of individual row groups to read. + + Parameters + ---------- + row_groups : list + List of row groups to read + + Returns + ------- + None + """ + cdef vector[vector[size_type]] outer + cdef vector[size_type] inner + for row_group in row_groups: + for x in row_group: + inner.push_back(x) + outer.push_back(inner) + inner.clear() + + self.c_obj.set_row_groups(outer) + + cpdef void set_num_rows(self, size_type nrows): + """ + Sets number of rows to read. + + Parameters + ---------- + nrows : size_type + Number of rows to read after skip + + Returns + ------- + None + """ + self.c_obj.set_num_rows(nrows) + + cpdef void set_skip_rows(self, int64_t skip_rows): + """ + Sets number of rows to skip. + + Parameters + ---------- + skip_rows : int64_t + Number of rows to skip from start + + Returns + ------- + None + """ + self.c_obj.set_skip_rows(skip_rows) + + cpdef void set_columns(self, list col_names): + """ + Sets names of the columns to be read. + + Parameters + ---------- + col_names : list + List of column names + + Returns + ------- + None + """ + cdef vector[string] vec + for name in col_names: + vec.push_back(str(name).encode()) + self.c_obj.set_columns(vec) + + cpdef void set_filter(self, Expression filter): + """ + Sets AST based filter for predicate pushdown. + + Parameters + ---------- + filter : Expression + AST expression to use as filter + + Returns + ------- + None + """ + self.c_obj.set_filter(dereference(filter.c_obj.get())) + + +cdef class ParquetReaderOptionsBuilder: + cpdef ParquetReaderOptionsBuilder convert_strings_to_categories(self, bool val): + """ + Sets enable/disable conversion of strings to categories. + + Parameters + ---------- + val : bool + Boolean value to enable/disable conversion of string columns to categories + + Returns + ------- + ParquetReaderOptionsBuilder + """ + self.c_obj.convert_strings_to_categories(val) + return self + + cpdef ParquetReaderOptionsBuilder use_pandas_metadata(self, bool val): + """ + Sets to enable/disable use of pandas metadata to read. + + Parameters + ---------- + val : bool + Boolean value whether to use pandas metadata + + Returns + ------- + ParquetReaderOptionsBuilder + """ + self.c_obj.use_pandas_metadata(val) + return self + + cpdef ParquetReaderOptionsBuilder allow_mismatched_pq_schemas(self, bool val): + """ + Sets to enable/disable reading of matching projected and filter + columns from mismatched Parquet sources. + + Parameters + ---------- + val : bool + Boolean value whether to read matching projected and filter + columns from mismatched Parquet sources. + + Returns + ------- + ParquetReaderOptionsBuilder + """ + self.c_obj.allow_mismatched_pq_schemas(val) + return self + + cpdef ParquetReaderOptionsBuilder use_arrow_schema(self, bool val): + """ + Sets to enable/disable use of arrow schema to read. + + Parameters + ---------- + val : bool + Boolean value whether to use arrow schema + + Returns + ------- + ParquetReaderOptionsBuilder + """ + self.c_obj.use_arrow_schema(val) + return self + + cpdef build(self): + """Create a ParquetReaderOptions object""" + cdef ParquetReaderOptions parquet_options = ParquetReaderOptions.__new__( + ParquetReaderOptions + ) + parquet_options.c_obj = move(self.c_obj.build()) + parquet_options.source = self.source + return parquet_options cdef class ChunkedParquetReader: @@ -93,63 +250,27 @@ cdef class ChunkedParquetReader: Parameters ---------- - source_info : SourceInfo - The SourceInfo object to read the Parquet file from. - columns : list, default None - The names of the columns to be read - row_groups : list[list[size_type]], default None - List of row groups to be read. - use_pandas_metadata : bool, default True - If True, return metadata about the index column in - the per-file user metadata of the ``TableWithMetadata`` - convert_strings_to_categories : bool, default False - Whether to convert string columns to the category type - skip_rows : int64_t, default 0 - The number of rows to skip from the start of the file. - nrows : size_type, default -1 - The number of rows to read. By default, read the entire file. + options : ParquetReaderOptions + Settings for controlling reading behavior chunk_read_limit : size_t, default 0 Limit on total number of bytes to be returned per read, or 0 if there is no limit. pass_read_limit : size_t, default 1024000000 Limit on the amount of memory used for reading and decompressing data or 0 if there is no limit. - allow_mismatched_pq_schemas : bool, default False - Whether to read (matching) columns specified in `columns` from - the input files with otherwise mismatched schemas. """ def __init__( self, - SourceInfo source_info, - list columns=None, - list row_groups=None, - bool use_pandas_metadata=True, - bool convert_strings_to_categories=False, - int64_t skip_rows = 0, - size_type nrows = -1, + ParquetReaderOptions options, size_t chunk_read_limit=0, size_t pass_read_limit=1024000000, - bool allow_mismatched_pq_schemas=False ): - - cdef parquet_reader_options opts = _setup_parquet_reader_options( - source_info, - columns, - row_groups, - filters=None, - convert_strings_to_categories=convert_strings_to_categories, - use_pandas_metadata=use_pandas_metadata, - skip_rows=skip_rows, - nrows=nrows, - allow_mismatched_pq_schemas=allow_mismatched_pq_schemas, - ) - with nogil: self.reader.reset( new cpp_chunked_parquet_reader( chunk_read_limit, pass_read_limit, - opts + options.c_obj, ) ) @@ -184,69 +305,23 @@ cdef class ChunkedParquetReader: return TableWithMetadata.from_libcudf(c_result) -cpdef read_parquet( - SourceInfo source_info, - list columns = None, - list row_groups = None, - Expression filters = None, - bool convert_strings_to_categories = False, - bool use_pandas_metadata = True, - int64_t skip_rows = 0, - size_type nrows = -1, - bool allow_mismatched_pq_schemas = False, - # Disabled, these aren't used by cudf-python - # we should only add them back in if there's user demand - # ReaderColumnSchema reader_column_schema = None, - # DataType timestamp_type = DataType(type_id.EMPTY) -): - """Reads an Parquet file into a :py:class:`~.types.TableWithMetadata`. + +cpdef read_parquet(ParquetReaderOptions options): + """ + Read from Parquet format. + + The source to read from and options are encapsulated + by the `options` object. For details, see :cpp:func:`read_parquet`. Parameters ---------- - source_info : SourceInfo - The SourceInfo object to read the Parquet file from. - columns : list, default None - The string names of the columns to be read. - row_groups : list[list[size_type]], default None - List of row groups to be read. - filters : Expression, default None - An AST :py:class:`pylibcudf.expressions.Expression` - to use for predicate pushdown. - convert_strings_to_categories : bool, default False - Whether to convert string columns to the category type - use_pandas_metadata : bool, default True - If True, return metadata about the index column in - the per-file user metadata of the ``TableWithMetadata`` - skip_rows : int64_t, default 0 - The number of rows to skip from the start of the file. - nrows : size_type, default -1 - The number of rows to read. By default, read the entire file. - allow_mismatched_pq_schemas : bool, default False - If True, enable reading (matching) columns specified in `columns` - from the input files with otherwise mismatched schemas. - - Returns - ------- - TableWithMetadata - The Table and its corresponding metadata (column names) that were read in. + options: ParquetReaderOptions + Settings for controlling reading behavior """ - cdef table_with_metadata c_result - cdef parquet_reader_options opts = _setup_parquet_reader_options( - source_info, - columns, - row_groups, - filters, - convert_strings_to_categories, - use_pandas_metadata, - skip_rows, - nrows, - allow_mismatched_pq_schemas, - ) - with nogil: - c_result = move(cpp_read_parquet(opts)) + c_result = move(cpp_read_parquet(options.c_obj)) return TableWithMetadata.from_libcudf(c_result) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index 94524acbcc8..da535809745 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -31,19 +31,24 @@ def test_read_parquet_basic( binary_source_or_sink, pa_table, **_COMMON_PARQUET_SOURCE_KWARGS ) - res = plc.io.parquet.read_parquet( - plc.io.SourceInfo([source]), - nrows=nrows, - skip_rows=skiprows, - columns=columns, - ) + options = plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo([source]) + ).build() + if nrows > -1: + options.set_num_rows(nrows) + if skiprows != 0: + options.set_skip_rows(skiprows) + if columns is not None: + options.set_columns(columns) + + res = plc.io.parquet.read_parquet(options) if columns is not None: pa_table = pa_table.select(columns) # Adapt to nrows/skiprows pa_table = pa_table.slice( - offset=skiprows, length=nrows if nrows != -1 else None + offset=skiprows, length=nrows if nrows > -1 else None ) assert_table_and_meta_eq(pa_table, res, check_field_nullability=False) @@ -95,9 +100,12 @@ def test_read_parquet_filters( binary_source_or_sink, pa_table, **_COMMON_PARQUET_SOURCE_KWARGS ) - plc_table_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo([source]), filters=plc_filters - ) + options = plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo([source]) + ).build() + options.set_filter(plc_filters) + + plc_table_w_meta = plc.io.parquet.read_parquet(options) exp = read_table(source, filters=pa_filters) assert_table_and_meta_eq( exp, plc_table_w_meta, check_field_nullability=False