From b6f7e6ea33d8f516033508224cd89bbd09a791ee Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 6 Dec 2024 12:55:22 -0800 Subject: [PATCH 1/7] Remove cudf._lib.orc in favor of inlining pylibcudf (#17466) Contributes to https://github.com/rapidsai/cudf/issues/17317 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/17466 --- python/cudf/cudf/_lib/CMakeLists.txt | 1 - python/cudf/cudf/_lib/__init__.py | 1 - python/cudf/cudf/_lib/orc.pyx | 466 ------------------ python/cudf/cudf/io/orc.py | 613 +++++++++++++++++------- python/cudf/cudf/utils/ioutils.py | 161 ++++++- python/pylibcudf/pylibcudf/io/types.pxd | 1 - python/pylibcudf/pylibcudf/io/types.pyi | 2 + python/pylibcudf/pylibcudf/io/types.pyx | 6 +- 8 files changed, 603 insertions(+), 648 deletions(-) delete mode 100644 python/cudf/cudf/_lib/orc.pyx diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index cff25f5752c..e98cf283bbb 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -18,7 +18,6 @@ set(cython_sources csv.pyx groupby.pyx interop.pyx - orc.pyx parquet.pyx reduce.pyx scalar.pyx diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 05310d8d232..4758a933898 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -7,7 +7,6 @@ groupby, interop, nvtext, - orc, parquet, reduce, sort, diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx deleted file mode 100644 index c829cac6409..00000000000 --- a/python/cudf/cudf/_lib/orc.pyx +++ /dev/null @@ -1,466 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from libc.stdint cimport int64_t -from libcpp cimport bool, int -from libcpp.map cimport map -from libcpp.string cimport string -from libcpp.vector cimport vector -import itertools -from collections import OrderedDict - -try: - import ujson as json -except ImportError: - import json - -cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view - -from cudf._lib.column cimport Column -from cudf._lib.io.utils cimport update_col_struct_field_names -from cudf._lib.utils cimport data_from_pylibcudf_io - -import pylibcudf as plc - -import cudf -from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES -from cudf._lib.utils import _index_level_name, generate_pandas_metadata -from cudf.core.buffer import acquire_spill_lock -from pylibcudf.io.types cimport TableInputMetadata, SinkInfo, ColumnInMetadata -from pylibcudf.io.orc cimport OrcChunkedWriter - -# TODO: Consider inlining this function since it seems to only be used in one place. -cpdef read_parsed_orc_statistics(filepath_or_buffer): - """ - Cython function to call into libcudf API, see `read_parsed_orc_statistics`. - - See Also - -------- - cudf.io.orc.read_orc_statistics - """ - - parsed = ( - plc.io.orc.read_parsed_orc_statistics( - plc.io.SourceInfo([filepath_or_buffer]) - ) - ) - - return parsed.column_names, parsed.file_stats, parsed.stripes_stats - - -cpdef read_orc(object filepaths_or_buffers, - object columns=None, - object stripes=None, - object skip_rows=None, - object num_rows=None, - bool use_index=True, - object timestamp_type=None): - """ - Cython function to call into libcudf API, see `read_orc`. - - See Also - -------- - cudf.read_orc - - Notes - ----- - Currently this function only considers the metadata of the first file in the list of - filepaths_or_buffers. - """ - - if columns is not None: - columns = [str(col) for col in columns] - - tbl_w_meta = plc.io.orc.read_orc( - plc.io.SourceInfo(filepaths_or_buffers), - columns, - stripes, - get_skiprows_arg(skip_rows), - get_num_rows_arg(num_rows), - use_index, - plc.types.DataType( - SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES[ - cudf.dtype(timestamp_type) - ] - ) - ) - - names = tbl_w_meta.column_names(include_children=False) - - actual_index_names, col_names, is_range_index, reset_index_name, \ - range_idx = _get_index_from_metadata(tbl_w_meta.per_file_user_data, - names, - skip_rows, - num_rows) - - if columns is not None and (isinstance(columns, list) and len(columns) == 0): - # When `columns=[]`, index needs to be - # established, but not the columns. - nrows = tbl_w_meta.tbl.num_rows() - return {}, cudf.RangeIndex(nrows) - - data, index = data_from_pylibcudf_io( - tbl_w_meta, - col_names if columns is None else names, - actual_index_names - ) - - if is_range_index: - index = range_idx - elif reset_index_name: - index.names = [None] * len(index.names) - - child_name_values = tbl_w_meta.child_names.values() - - data = { - name: update_col_struct_field_names( - col, child_names - ) - for (name, col), child_names in zip(data.items(), child_name_values) - } - - return data, index - - -def _get_comp_type(object compression): - if compression is None or compression is False: - return plc.io.types.CompressionType.NONE - - compression = str(compression).upper() - if compression == "SNAPPY": - return plc.io.types.CompressionType.SNAPPY - elif compression == "ZLIB": - return plc.io.types.CompressionType.ZLIB - elif compression == "ZSTD": - return plc.io.types.CompressionType.ZSTD - elif compression == "LZ4": - return plc.io.types.CompressionType.LZ4 - else: - raise ValueError(f"Unsupported `compression` type {compression}") - - -cdef tuple _get_index_from_metadata( - vector[map[string, string]] user_data, - object names, - object skip_rows, - object num_rows): - - meta = None - index_col = None - is_range_index = False - reset_index_name = False - range_idx = None - - if user_data.size() > 0: - json_str = user_data[0][b'pandas'].decode('utf-8') - if json_str != "": - meta = json.loads(json_str) - if 'index_columns' in meta and len(meta['index_columns']) > 0: - index_col = meta['index_columns'] - if isinstance(index_col[0], dict) and \ - index_col[0]['kind'] == 'range': - is_range_index = True - else: - index_col_names = OrderedDict() - for idx_col in index_col: - for c in meta['columns']: - if c['field_name'] == idx_col: - index_col_names[idx_col] = \ - c['name'] or c['field_name'] - if c['name'] is None: - reset_index_name = True - - actual_index_names = None - if index_col is not None and len(index_col) > 0: - if is_range_index: - range_index_meta = index_col[0] - range_idx = cudf.RangeIndex( - start=range_index_meta['start'], - stop=range_index_meta['stop'], - step=range_index_meta['step'], - name=range_index_meta['name'] - ) - if skip_rows is not None: - range_idx = range_idx[skip_rows:] - if num_rows is not None: - range_idx = range_idx[:num_rows] - else: - actual_index_names = list(index_col_names.values()) - names = names[len(actual_index_names):] - - return ( - actual_index_names, - names, - is_range_index, - reset_index_name, - range_idx - ) - - -def _get_orc_stat_freq(str statistics): - """ - Convert ORC statistics terms to CUDF convention: - - ORC "STRIPE" == CUDF "ROWGROUP" - - ORC "ROWGROUP" == CUDF "PAGE" - """ - statistics = str(statistics).upper() - if statistics == "NONE": - return plc.io.types.StatisticsFreq.STATISTICS_NONE - elif statistics == "STRIPE": - return plc.io.types.StatisticsFreq.STATISTICS_ROWGROUP - elif statistics == "ROWGROUP": - return plc.io.types.StatisticsFreq.STATISTICS_PAGE - else: - raise ValueError(f"Unsupported `statistics_freq` type {statistics}") - - -@acquire_spill_lock() -def write_orc( - table, - object path_or_buf, - object compression="snappy", - str statistics="ROWGROUP", - object stripe_size_bytes=None, - object stripe_size_rows=None, - object row_index_stride=None, - object cols_as_map_type=None, - object index=None -): - """ - Cython function to call into libcudf API, see `cudf::io::write_orc`. - - See Also - -------- - cudf.read_orc - """ - user_data = {} - user_data["pandas"] = generate_pandas_metadata(table, index) - if index is True or ( - index is None and not isinstance(table._index, cudf.RangeIndex) - ): - columns = table._columns if table._index is None else [ - *table.index._columns, *table._columns - ] - plc_table = plc.Table([col.to_pylibcudf(mode="read") for col in columns]) - tbl_meta = TableInputMetadata(plc_table) - for level, idx_name in enumerate(table._index.names): - tbl_meta.column_metadata[level].set_name( - _index_level_name(idx_name, level, table._column_names) - ) - num_index_cols_meta = len(table._index.names) - else: - plc_table = plc.Table( - [col.to_pylibcudf(mode="read") for col in table._columns] - ) - tbl_meta = TableInputMetadata(plc_table) - num_index_cols_meta = 0 - - if cols_as_map_type is not None: - cols_as_map_type = set(cols_as_map_type) - - for i, name in enumerate(table._column_names, num_index_cols_meta): - tbl_meta.column_metadata[i].set_name(name) - _set_col_children_metadata( - table[name]._column, - tbl_meta.column_metadata[i], - (cols_as_map_type is not None) - and (name in cols_as_map_type), - ) - - options = ( - plc.io.orc.OrcWriterOptions.builder( - plc.io.SinkInfo([path_or_buf]), plc_table - ) - .metadata(tbl_meta) - .key_value_metadata(user_data) - .compression(_get_comp_type(compression)) - .enable_statistics(_get_orc_stat_freq(statistics)) - .build() - ) - if stripe_size_bytes is not None: - options.set_stripe_size_bytes(stripe_size_bytes) - if stripe_size_rows is not None: - options.set_stripe_size_rows(stripe_size_rows) - if row_index_stride is not None: - options.set_row_index_stride(row_index_stride) - - plc.io.orc.write_orc(options) - - -cdef int64_t get_skiprows_arg(object arg) except*: - arg = 0 if arg is None else arg - if not isinstance(arg, int) or arg < 0: - raise TypeError("skiprows must be an int >= 0") - return arg - -cdef int64_t get_num_rows_arg(object arg) except*: - arg = -1 if arg is None else arg - if not isinstance(arg, int) or arg < -1: - raise TypeError("num_rows must be an int >= -1") - return arg - - -cdef class ORCWriter: - """ - ORCWriter lets you you incrementally write out a ORC file from a series - of cudf tables - - See Also - -------- - cudf.io.orc.to_orc - """ - cdef bool initialized - cdef OrcChunkedWriter writer - cdef SinkInfo sink - cdef str statistics - cdef object compression - cdef object index - cdef TableInputMetadata tbl_meta - cdef object cols_as_map_type - cdef object stripe_size_bytes - cdef object stripe_size_rows - cdef object row_index_stride - - def __cinit__(self, - object path, - object index=None, - object compression="snappy", - str statistics="ROWGROUP", - object cols_as_map_type=None, - object stripe_size_bytes=None, - object stripe_size_rows=None, - object row_index_stride=None): - self.sink = plc.io.SinkInfo([path]) - self.statistics = statistics - self.compression = compression - self.index = index - self.cols_as_map_type = cols_as_map_type \ - if cols_as_map_type is None else set(cols_as_map_type) - self.stripe_size_bytes = stripe_size_bytes - self.stripe_size_rows = stripe_size_rows - self.row_index_stride = row_index_stride - self.initialized = False - - def write_table(self, table): - """ Writes a single table to the file """ - if not self.initialized: - self._initialize_chunked_state(table) - - keep_index = self.index is not False and ( - table._index.name is not None or - isinstance(table._index, cudf.core.multiindex.MultiIndex) - ) - if keep_index: - columns = [ - col.to_pylibcudf(mode="read") - for col in itertools.chain(table.index._columns, table._columns) - ] - else: - columns = [col.to_pylibcudf(mode="read") for col in table._columns] - - self.writer.write(plc.Table(columns)) - - def close(self): - if not self.initialized: - return - - self.writer.close() - - def __dealloc__(self): - self.close() - - def _initialize_chunked_state(self, table): - """ - Prepare all the values required to build the - chunked_orc_writer_options anb creates a writer""" - - num_index_cols_meta = 0 - plc_table = plc.Table( - [ - col.to_pylibcudf(mode="read") - for col in table._columns - ] - ) - self.tbl_meta = TableInputMetadata(plc_table) - if self.index is not False: - if isinstance(table._index, cudf.core.multiindex.MultiIndex): - plc_table = plc.Table( - [ - col.to_pylibcudf(mode="read") - for col in itertools.chain(table.index._columns, table._columns) - ] - ) - self.tbl_meta = TableInputMetadata(plc_table) - for level, idx_name in enumerate(table._index.names): - self.tbl_meta.column_metadata[level].set_name( - idx_name - ) - num_index_cols_meta = len(table._index.names) - else: - if table._index.name is not None: - plc_table = plc.Table( - [ - col.to_pylibcudf(mode="read") - for col in itertools.chain( - table.index._columns, table._columns - ) - ] - ) - self.tbl_meta = TableInputMetadata(plc_table) - self.tbl_meta.column_metadata[0].set_name( - table._index.name - ) - num_index_cols_meta = 1 - - for i, name in enumerate(table._column_names, num_index_cols_meta): - self.tbl_meta.column_metadata[i].set_name(name) - _set_col_children_metadata( - table[name]._column, - self.tbl_meta.column_metadata[i], - (self.cols_as_map_type is not None) - and (name in self.cols_as_map_type), - ) - - user_data = {} - pandas_metadata = generate_pandas_metadata(table, self.index) - user_data["pandas"] = pandas_metadata - - options = ( - plc.io.orc.ChunkedOrcWriterOptions.builder(self.sink) - .metadata(self.tbl_meta) - .key_value_metadata(user_data) - .compression(_get_comp_type(self.compression)) - .enable_statistics(_get_orc_stat_freq(self.statistics)) - .build() - ) - if self.stripe_size_bytes is not None: - options.set_stripe_size_bytes(self.stripe_size_bytes) - if self.stripe_size_rows is not None: - options.set_stripe_size_rows(self.stripe_size_rows) - if self.row_index_stride is not None: - options.set_row_index_stride(self.row_index_stride) - - self.writer = plc.io.orc.OrcChunkedWriter.from_options(options) - - self.initialized = True - -cdef _set_col_children_metadata(Column col, - ColumnInMetadata col_meta, - list_column_as_map=False): - if isinstance(col.dtype, cudf.StructDtype): - for i, (child_col, name) in enumerate( - zip(col.children, list(col.dtype.fields)) - ): - col_meta.child(i).set_name(name) - _set_col_children_metadata( - child_col, col_meta.child(i), list_column_as_map - ) - elif isinstance(col.dtype, cudf.ListDtype): - if list_column_as_map: - col_meta.set_list_column_as_map() - _set_col_children_metadata( - col.children[cpp_lists_column_view.child_column_index], - col_meta.child(cpp_lists_column_view.child_column_index), - list_column_as_map - ) - else: - return diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index 68b60809bb9..5616413b7e4 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -1,147 +1,28 @@ # Copyright (c) 2019-2024, NVIDIA CORPORATION. +from __future__ import annotations -import datetime +import itertools import warnings +from typing import TYPE_CHECKING, Literal import pyarrow as pa +import pylibcudf as plc + import cudf -from cudf._lib import orc as liborc +from cudf._lib.types import dtype_to_pylibcudf_type +from cudf._lib.utils import data_from_pylibcudf_io from cudf.api.types import is_list_like +from cudf.core.buffer import acquire_spill_lock from cudf.utils import ioutils +try: + import ujson as json # type: ignore[import-untyped] +except ImportError: + import json -def _make_empty_df(filepath_or_buffer, columns): - from pyarrow import orc - - orc_file = orc.ORCFile(filepath_or_buffer) - schema = orc_file.schema - col_names = schema.names if columns is None else columns - return cudf.DataFrame._from_data( - data={ - col_name: cudf.core.column.column_empty( - row_count=0, - dtype=schema.field(col_name).type.to_pandas_dtype(), - ) - for col_name in col_names - } - ) - - -def _parse_column_statistics(cs, column_statistics_blob): - # Initialize stats to return and parse stats blob - column_statistics = {} - cs.ParseFromString(column_statistics_blob) - - # Load from parsed stats blob into stats to return - if cs.HasField("numberOfValues"): - column_statistics["number_of_values"] = cs.numberOfValues - if cs.HasField("hasNull"): - column_statistics["has_null"] = cs.hasNull - - if cs.HasField("intStatistics"): - column_statistics["minimum"] = ( - cs.intStatistics.minimum - if cs.intStatistics.HasField("minimum") - else None - ) - column_statistics["maximum"] = ( - cs.intStatistics.maximum - if cs.intStatistics.HasField("maximum") - else None - ) - column_statistics["sum"] = ( - cs.intStatistics.sum if cs.intStatistics.HasField("sum") else None - ) - - elif cs.HasField("doubleStatistics"): - column_statistics["minimum"] = ( - cs.doubleStatistics.minimum - if cs.doubleStatistics.HasField("minimum") - else None - ) - column_statistics["maximum"] = ( - cs.doubleStatistics.maximum - if cs.doubleStatistics.HasField("maximum") - else None - ) - column_statistics["sum"] = ( - cs.doubleStatistics.sum - if cs.doubleStatistics.HasField("sum") - else None - ) - - elif cs.HasField("stringStatistics"): - column_statistics["minimum"] = ( - cs.stringStatistics.minimum - if cs.stringStatistics.HasField("minimum") - else None - ) - column_statistics["maximum"] = ( - cs.stringStatistics.maximum - if cs.stringStatistics.HasField("maximum") - else None - ) - column_statistics["sum"] = cs.stringStatistics.sum - - elif cs.HasField("bucketStatistics"): - column_statistics["true_count"] = cs.bucketStatistics.count[0] - column_statistics["false_count"] = ( - column_statistics["number_of_values"] - - column_statistics["true_count"] - ) - - elif cs.HasField("decimalStatistics"): - column_statistics["minimum"] = ( - cs.decimalStatistics.minimum - if cs.decimalStatistics.HasField("minimum") - else None - ) - column_statistics["maximum"] = ( - cs.decimalStatistics.maximum - if cs.decimalStatistics.HasField("maximum") - else None - ) - column_statistics["sum"] = cs.decimalStatistics.sum - - elif cs.HasField("dateStatistics"): - column_statistics["minimum"] = ( - datetime.datetime.fromtimestamp( - datetime.timedelta(cs.dateStatistics.minimum).total_seconds(), - datetime.timezone.utc, - ) - if cs.dateStatistics.HasField("minimum") - else None - ) - column_statistics["maximum"] = ( - datetime.datetime.fromtimestamp( - datetime.timedelta(cs.dateStatistics.maximum).total_seconds(), - datetime.timezone.utc, - ) - if cs.dateStatistics.HasField("maximum") - else None - ) - - elif cs.HasField("timestampStatistics"): - # Before ORC-135, the local timezone offset was included and they were - # stored as minimum and maximum. After ORC-135, the timestamp is - # adjusted to UTC before being converted to milliseconds and stored - # in minimumUtc and maximumUtc. - # TODO: Support minimum and maximum by reading writer's local timezone - if cs.timestampStatistics.HasField( - "minimumUtc" - ) and cs.timestampStatistics.HasField("maximumUtc"): - column_statistics["minimum"] = datetime.datetime.fromtimestamp( - cs.timestampStatistics.minimumUtc / 1000, datetime.timezone.utc - ) - column_statistics["maximum"] = datetime.datetime.fromtimestamp( - cs.timestampStatistics.maximumUtc / 1000, datetime.timezone.utc - ) - - elif cs.HasField("binaryStatistics"): - column_statistics["sum"] = cs.binaryStatistics.sum - - return column_statistics +if TYPE_CHECKING: + from cudf.core.column import ColumnBase @ioutils.doc_read_orc_metadata() @@ -175,11 +56,12 @@ def read_orc_statistics( path_or_buf = ioutils._select_single_source( path_or_buf, "read_orc_statistics" ) - ( - column_names, - parsed_file_statistics, - parsed_stripes_statistics, - ) = liborc.read_parsed_orc_statistics(path_or_buf) + parsed = plc.io.orc.read_parsed_orc_statistics( + plc.io.SourceInfo([path_or_buf]) + ) + column_names = parsed.column_names + parsed_file_statistics = parsed.file_stats + parsed_stripes_statistics = parsed.stripes_stats # Parse file statistics file_statistics = { @@ -273,16 +155,14 @@ def read_orc( columns=None, filters=None, stripes=None, - skiprows=None, - num_rows=None, - use_index=True, + skiprows: int | None = None, + num_rows: int | None = None, + use_index: bool = True, timestamp_type=None, storage_options=None, bytes_per_thread=None, ): """{docstring}""" - from cudf import DataFrame - if skiprows is not None: # Do not remove until cuIO team approves its removal. warnings.warn( @@ -329,31 +209,132 @@ def read_orc( # Return empty if everything was filtered if len(selected_stripes) == 0: - return _make_empty_df(filepaths_or_buffers[0], columns) + from pyarrow import orc + + orc_file = orc.ORCFile(filepaths_or_buffers[0]) + schema = orc_file.schema + col_names = schema.names if columns is None else columns + return cudf.DataFrame._from_data( + data={ + col_name: cudf.core.column.column_empty( + row_count=0, + dtype=schema.field(col_name).type.to_pandas_dtype(), + ) + for col_name in col_names + } + ) else: stripes = selected_stripes if engine == "cudf": - return DataFrame._from_data( - *liborc.read_orc( - filepaths_or_buffers, - columns, - stripes, - skiprows, - num_rows, - use_index, - timestamp_type, - ) + if columns is not None: + columns = [str(col) for col in columns] + + if skiprows is None: + skiprows = 0 + elif not isinstance(skiprows, int) or skiprows < 0: + raise TypeError("skiprows must be an int >= 0") + + if num_rows is None: + num_rows = -1 + elif not isinstance(num_rows, int) or num_rows < -1: + raise TypeError("num_rows must be an int >= -1") + + tbl_w_meta = plc.io.orc.read_orc( + plc.io.SourceInfo(filepaths_or_buffers), + columns, + stripes, + skiprows, + num_rows, + use_index, + dtype_to_pylibcudf_type(cudf.dtype(timestamp_type)), ) + + if isinstance(columns, list) and len(columns) == 0: + # When `columns=[]`, index needs to be + # established, but not the columns. + nrows = tbl_w_meta.tbl.num_rows() + data = {} + index = cudf.RangeIndex(nrows) + else: + names = tbl_w_meta.column_names(include_children=False) + index_col = None + is_range_index = False + reset_index_name = False + range_idx = None + + if len(tbl_w_meta.per_file_user_data) > 0: + json_str = ( + tbl_w_meta.per_file_user_data[0] + .get(b"pandas", b"") + .decode("utf-8") + ) + if json_str != "": + meta = json.loads(json_str) + if ( + "index_columns" in meta + and len(meta["index_columns"]) > 0 + ): + index_col = meta["index_columns"] + if ( + isinstance(index_col[0], dict) + and index_col[0]["kind"] == "range" + ): + is_range_index = True + else: + index_col_names = {} + for idx_col in index_col: + for c in meta["columns"]: + if c["field_name"] == idx_col: + index_col_names[idx_col] = ( + c["name"] or c["field_name"] + ) + if c["name"] is None: + reset_index_name = True + + actual_index_names = None + col_names = names + if index_col is not None and len(index_col) > 0: + if is_range_index: + range_index_meta = index_col[0] + range_idx = cudf.RangeIndex( + start=range_index_meta["start"], + stop=range_index_meta["stop"], + step=range_index_meta["step"], + name=range_index_meta["name"], + ) + if skiprows != 0: + range_idx = range_idx[skiprows:] + if num_rows != -1: + range_idx = range_idx[:num_rows] + else: + actual_index_names = list(index_col_names.values()) + col_names = names[len(actual_index_names) :] + + data, index = data_from_pylibcudf_io( + tbl_w_meta, + col_names if columns is None else names, + actual_index_names, + ) + + if is_range_index: + index = range_idx + elif reset_index_name: + index.names = [None] * len(index.names) + + child_name_values = tbl_w_meta.child_names.values() + + data = { + name: ioutils._update_col_struct_field_names(col, child_names) + for (name, col), child_names in zip( + data.items(), child_name_values + ) + } + + return cudf.DataFrame._from_data(data, index=index) else: from pyarrow import orc - def read_orc_stripe(orc_file, stripe, columns): - pa_table = orc_file.read_stripe(stripe, columns) - if isinstance(pa_table, pa.RecordBatch): - pa_table = pa.Table.from_batches([pa_table]) - return pa_table - warnings.warn("Using CPU via PyArrow to read ORC dataset.") if len(filepath_or_buffer) > 1: raise NotImplementedError( @@ -364,11 +345,18 @@ def read_orc_stripe(orc_file, stripe, columns): orc_file = orc.ORCFile(filepath_or_buffer[0]) if stripes is not None and len(stripes) > 0: for stripe_source_file in stripes: - pa_tables = [ - read_orc_stripe(orc_file, i, columns) + pa_tables = ( + orc_file.read_stripe(i, columns) for i in stripe_source_file - ] - pa_table = pa.concat_tables(pa_tables) + ) + pa_table = pa.concat_tables( + [ + pa.Table.from_batches([table]) + if isinstance(table, pa.RecordBatch) + else table + for table in pa_tables + ] + ) else: pa_table = orc_file.read(columns=columns) df = cudf.DataFrame.from_arrow(pa_table) @@ -378,16 +366,18 @@ def read_orc_stripe(orc_file, stripe, columns): @ioutils.doc_to_orc() def to_orc( - df, + df: cudf.DataFrame, fname, - compression="snappy", - statistics="ROWGROUP", - stripe_size_bytes=None, - stripe_size_rows=None, - row_index_stride=None, + compression: Literal[ + False, None, "SNAPPY", "ZLIB", "ZSTD", "LZ4" + ] = "SNAPPY", + statistics: Literal["NONE", "STRIPE", "ROWGROUP"] = "ROWGROUP", + stripe_size_bytes: int | None = None, + stripe_size_rows: int | None = None, + row_index_stride: int | None = None, cols_as_map_type=None, storage_options=None, - index=None, + index: bool | None = None, ): """{docstring}""" @@ -413,7 +403,7 @@ def to_orc( if ioutils.is_fsspec_open_file(path_or_buf): with path_or_buf as file_obj: file_obj = ioutils.get_IOBase_writer(file_obj) - liborc.write_orc( + _plc_write_orc( df, file_obj, compression, @@ -425,7 +415,7 @@ def to_orc( index, ) else: - liborc.write_orc( + _plc_write_orc( df, path_or_buf, compression, @@ -438,4 +428,279 @@ def to_orc( ) -ORCWriter = liborc.ORCWriter +@acquire_spill_lock() +def _plc_write_orc( + table: cudf.DataFrame, + path_or_buf, + compression: Literal[ + False, None, "SNAPPY", "ZLIB", "ZSTD", "LZ4" + ] = "SNAPPY", + statistics: Literal["NONE", "STRIPE", "ROWGROUP"] = "ROWGROUP", + stripe_size_bytes: int | None = None, + stripe_size_rows: int | None = None, + row_index_stride: int | None = None, + cols_as_map_type=None, + index: bool | None = None, +) -> None: + """ + See `cudf::io::write_orc`. + + See Also + -------- + cudf.read_orc + """ + user_data = {"pandas": ioutils.generate_pandas_metadata(table, index)} + if index is True or ( + index is None and not isinstance(table.index, cudf.RangeIndex) + ): + columns = ( + table._columns + if table.index is None + else itertools.chain(table.index._columns, table._columns) + ) + plc_table = plc.Table( + [col.to_pylibcudf(mode="read") for col in columns] + ) + tbl_meta = plc.io.types.TableInputMetadata(plc_table) + for level, idx_name in enumerate(table._index.names): + tbl_meta.column_metadata[level].set_name( + ioutils._index_level_name(idx_name, level, table._column_names) # type: ignore[arg-type] + ) + num_index_cols_meta = len(table.index.names) + else: + plc_table = plc.Table( + [col.to_pylibcudf(mode="read") for col in table._columns] + ) + tbl_meta = plc.io.types.TableInputMetadata(plc_table) + num_index_cols_meta = 0 + + has_map_type = False + if cols_as_map_type is not None: + cols_as_map_type = set(cols_as_map_type) + has_map_type = True + + for i, (name, col) in enumerate( + table._column_labels_and_values, start=num_index_cols_meta + ): + tbl_meta.column_metadata[i].set_name(name) + _set_col_children_metadata( + col, + tbl_meta.column_metadata[i], + has_map_type and name in cols_as_map_type, + ) + + options = ( + plc.io.orc.OrcWriterOptions.builder( + plc.io.SinkInfo([path_or_buf]), plc_table + ) + .metadata(tbl_meta) + .key_value_metadata(user_data) + .compression(_get_comp_type(compression)) + .enable_statistics(_get_orc_stat_freq(statistics)) + .build() + ) + if stripe_size_bytes is not None: + options.set_stripe_size_bytes(stripe_size_bytes) + if stripe_size_rows is not None: + options.set_stripe_size_rows(stripe_size_rows) + if row_index_stride is not None: + options.set_row_index_stride(row_index_stride) + + plc.io.orc.write_orc(options) + + +class ORCWriter: + """ + ORCWriter lets you you incrementally write out a ORC file from a series + of cudf tables + + See Also + -------- + cudf.io.orc.to_orc + """ + + def __init__( + self, + path, + index: bool | None = None, + compression: Literal[ + False, None, "SNAPPY", "ZLIB", "ZSTD", "LZ4" + ] = "SNAPPY", + statistics: Literal["NONE", "STRIPE", "ROWGROUP"] = "ROWGROUP", + cols_as_map_type=None, + stripe_size_bytes: int | None = None, + stripe_size_rows: int | None = None, + row_index_stride: int | None = None, + ): + self.sink = plc.io.SinkInfo([path]) + self.statistics = statistics + self.compression = compression + self.index = index + self.cols_as_map_type = ( + cols_as_map_type + if cols_as_map_type is None + else set(cols_as_map_type) + ) + self.stripe_size_bytes = stripe_size_bytes + self.stripe_size_rows = stripe_size_rows + self.row_index_stride = row_index_stride + self.initialized = False + + def write_table(self, table): + """Writes a single table to the file""" + if not self.initialized: + self._initialize_chunked_state(table) + + keep_index = self.index is not False and ( + table.index.name is not None + or isinstance(table.index, cudf.MultiIndex) + ) + if keep_index: + cols_to_write = itertools.chain( + table.index._columns, table._columns + ) + else: + cols_to_write = table._columns + + self.writer.write( + plc.Table([col.to_pylibcudf(mode="read") for col in cols_to_write]) + ) + + def close(self): + if not self.initialized: + return + self.writer.close() + + def _initialize_chunked_state(self, table): + """ + Prepare all the values required to build the + chunked_orc_writer_options anb creates a writer + """ + + num_index_cols_meta = 0 + plc_table = plc.Table( + [col.to_pylibcudf(mode="read") for col in table._columns] + ) + self.tbl_meta = plc.io.types.TableInputMetadata(plc_table) + if self.index is not False: + if isinstance(table.index, cudf.MultiIndex): + plc_table = plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in itertools.chain( + table.index._columns, table._columns + ) + ] + ) + self.tbl_meta = plc.io.types.TableInputMetadata(plc_table) + for level, idx_name in enumerate(table.index.names): + self.tbl_meta.column_metadata[level].set_name(idx_name) + num_index_cols_meta = len(table.index.names) + else: + if table.index.name is not None: + plc_table = plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in itertools.chain( + table.index._columns, table._columns + ) + ] + ) + self.tbl_meta = plc.io.types.TableInputMetadata(plc_table) + self.tbl_meta.column_metadata[0].set_name(table.index.name) + num_index_cols_meta = 1 + + has_map_type = self.cols_as_map_type is not None + for i, (name, col) in enumerate( + table._column_labels_and_values, start=num_index_cols_meta + ): + self.tbl_meta.column_metadata[i].set_name(name) + _set_col_children_metadata( + col, + self.tbl_meta.column_metadata[i], + has_map_type and name in self.cols_as_map_type, + ) + + user_data = { + "pandas": ioutils.generate_pandas_metadata(table, self.index) + } + + options = ( + plc.io.orc.ChunkedOrcWriterOptions.builder(self.sink) + .metadata(self.tbl_meta) + .key_value_metadata(user_data) + .compression(_get_comp_type(self.compression)) + .enable_statistics(_get_orc_stat_freq(self.statistics)) + .build() + ) + if self.stripe_size_bytes is not None: + options.set_stripe_size_bytes(self.stripe_size_bytes) + if self.stripe_size_rows is not None: + options.set_stripe_size_rows(self.stripe_size_rows) + if self.row_index_stride is not None: + options.set_row_index_stride(self.row_index_stride) + + self.writer = plc.io.orc.OrcChunkedWriter.from_options(options) + + self.initialized = True + + +def _get_comp_type( + compression: Literal[False, None, "SNAPPY", "ZLIB", "ZSTD", "LZ4"], +) -> plc.io.types.CompressionType: + if compression is None or compression is False: + return plc.io.types.CompressionType.NONE + + normed_compression = compression.upper() + if normed_compression == "SNAPPY": + return plc.io.types.CompressionType.SNAPPY + elif normed_compression == "ZLIB": + return plc.io.types.CompressionType.ZLIB + elif normed_compression == "ZSTD": + return plc.io.types.CompressionType.ZSTD + elif normed_compression == "LZ4": + return plc.io.types.CompressionType.LZ4 + else: + raise ValueError(f"Unsupported `compression` type {compression}") + + +def _get_orc_stat_freq( + statistics: Literal["NONE", "STRIPE", "ROWGROUP"], +) -> plc.io.types.StatisticsFreq: + """ + Convert ORC statistics terms to CUDF convention: + - ORC "STRIPE" == CUDF "ROWGROUP" + - ORC "ROWGROUP" == CUDF "PAGE" + """ + normed_statistics = statistics.upper() + if normed_statistics == "NONE": + return plc.io.types.StatisticsFreq.STATISTICS_NONE + elif normed_statistics == "STRIPE": + return plc.io.types.StatisticsFreq.STATISTICS_ROWGROUP + elif normed_statistics == "ROWGROUP": + return plc.io.types.StatisticsFreq.STATISTICS_PAGE + else: + raise ValueError(f"Unsupported `statistics_freq` type {statistics}") + + +def _set_col_children_metadata( + col: ColumnBase, + col_meta: plc.io.types.ColumnInMetadata, + list_column_as_map: bool = False, +) -> None: + if isinstance(col.dtype, cudf.StructDtype): + for i, (child_col, name) in enumerate( + zip(col.children, list(col.dtype.fields)) + ): + col_meta.child(i).set_name(name) + _set_col_children_metadata( + child_col, col_meta.child(i), list_column_as_map + ) + elif isinstance(col.dtype, cudf.ListDtype): + if list_column_as_map: + col_meta.set_list_column_as_map() + _set_col_children_metadata( + col.children[1], col_meta.child(1), list_column_as_map + ) + else: + return diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 5681601d2be..d9a3da6666d 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -3,37 +3,45 @@ import datetime import functools +import json import operator import os import urllib import warnings from io import BufferedWriter, BytesIO, IOBase, TextIOWrapper from threading import Thread -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import fsspec import fsspec.implementations.local import numpy as np import pandas as pd +import pyarrow as pa from fsspec.core import expand_paths_if_needed, get_fs_token_paths import cudf from cudf.api.types import is_list_like from cudf.core._compat import PANDAS_LT_300 from cudf.utils.docutils import docfmt_partial +from cudf.utils.dtypes import np_dtypes_to_pandas_dtypes, np_to_pa_dtype try: import fsspec.parquet as fsspec_parquet - except ImportError: fsspec_parquet = None + if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Callable, Hashable from cudf.core.column import ColumnBase +PARQUET_META_TYPE_MAP = { + str(cudf_dtype): str(pandas_dtype) + for cudf_dtype, pandas_dtype in np_dtypes_to_pandas_dtypes.items() +} + _BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 _ROW_GROUP_SIZE_BYTES_DEFAULT = np.iinfo(np.uint64).max @@ -1487,6 +1495,153 @@ ) +def _index_level_name( + index_name: Hashable, level: int, column_names: list[Hashable] +) -> Hashable: + """ + Return the name of an index level or a default name + if `index_name` is None or is already a column name. + + Parameters + ---------- + index_name : name of an Index object + level : level of the Index object + + Returns + ------- + name : str + """ + if index_name is not None and index_name not in column_names: + return index_name + else: + return f"__index_level_{level}__" + + +def generate_pandas_metadata(table: cudf.DataFrame, index: bool | None) -> str: + col_names: list[Hashable] = [] + types = [] + index_levels = [] + index_descriptors = [] + columns_to_convert = list(table._columns) + # Columns + for name, col in table._column_labels_and_values: + if cudf.get_option("mode.pandas_compatible"): + # in pandas-compat mode, non-string column names are stringified. + col_names.append(str(name)) + else: + col_names.append(name) + + if isinstance(col.dtype, cudf.CategoricalDtype): + raise ValueError( + "'category' column dtypes are currently not " + + "supported by the gpu accelerated parquet writer" + ) + elif isinstance( + col.dtype, + (cudf.ListDtype, cudf.StructDtype, cudf.core.dtypes.DecimalDtype), + ): + types.append(col.dtype.to_arrow()) + else: + # A boolean element takes 8 bits in cudf and 1 bit in + # pyarrow. To make sure the cudf format is interoperable + # with arrow, we use `int8` type when converting from a + # cudf boolean array. + if col.dtype.type == np.bool_: + types.append(pa.int8()) + else: + types.append(np_to_pa_dtype(col.dtype)) + + # Indexes + materialize_index = False + if index is not False: + for level, name in enumerate(table.index.names): + if isinstance(table.index, cudf.MultiIndex): + idx = table.index.get_level_values(level) + else: + idx = table.index + + if isinstance(idx, cudf.RangeIndex): + if index is None: + descr: dict[str, Any] | Hashable = { + "kind": "range", + "name": table.index.name, + "start": table.index.start, + "stop": table.index.stop, + "step": table.index.step, + } + else: + materialize_index = True + # When `index=True`, RangeIndex needs to be materialized. + materialized_idx = idx._as_int_index() + descr = _index_level_name( + index_name=materialized_idx.name, + level=level, + column_names=col_names, + ) + index_levels.append(materialized_idx) + columns_to_convert.append(materialized_idx._values) + col_names.append(descr) + types.append(np_to_pa_dtype(materialized_idx.dtype)) + else: + descr = _index_level_name( + index_name=idx.name, level=level, column_names=col_names + ) + columns_to_convert.append(idx._values) + col_names.append(descr) + if isinstance(idx.dtype, cudf.CategoricalDtype): + raise ValueError( + "'category' column dtypes are currently not " + + "supported by the gpu accelerated parquet writer" + ) + elif isinstance(idx.dtype, cudf.ListDtype): + types.append(col.dtype.to_arrow()) + else: + # A boolean element takes 8 bits in cudf and 1 bit in + # pyarrow. To make sure the cudf format is interperable + # in arrow, we use `int8` type when converting from a + # cudf boolean array. + if idx.dtype.type == np.bool_: + types.append(pa.int8()) + else: + types.append(np_to_pa_dtype(idx.dtype)) + + index_levels.append(idx) + index_descriptors.append(descr) + + df_meta = table.head(0) + if materialize_index: + df_meta.index = df_meta.index._as_int_index() + metadata = pa.pandas_compat.construct_metadata( + columns_to_convert=columns_to_convert, + # It is OKAY to do `.head(0).to_pandas()` because + # this method will extract `.columns` metadata only + df=df_meta.to_pandas(), + column_names=col_names, + index_levels=index_levels, + index_descriptors=index_descriptors, + preserve_index=index, + types=types, + ) + + md_dict = json.loads(metadata[b"pandas"]) + + # correct metadata for list and struct and nullable numeric types + for col_meta in md_dict["columns"]: + if ( + col_meta["name"] in table._column_names + and table._data[col_meta["name"]].nullable + and col_meta["numpy_type"] in PARQUET_META_TYPE_MAP + and col_meta["pandas_type"] != "decimal" + ): + col_meta["numpy_type"] = PARQUET_META_TYPE_MAP[ + col_meta["numpy_type"] + ] + if col_meta["numpy_type"] in ("list", "struct"): + col_meta["numpy_type"] = "object" + + return json.dumps(md_dict) + + def is_url(url): """Check if a string is a valid URL to a network location. diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd index a1f3b17936c..61fe33d6805 100644 --- a/python/pylibcudf/pylibcudf/io/types.pxd +++ b/python/pylibcudf/pylibcudf/io/types.pxd @@ -65,7 +65,6 @@ cdef class ColumnInMetadata: cdef class TableInputMetadata: cdef table_input_metadata c_obj - cdef list column_metadata cdef class TableWithMetadata: cdef public Table tbl diff --git a/python/pylibcudf/pylibcudf/io/types.pyi b/python/pylibcudf/pylibcudf/io/types.pyi index a3a559219ff..63fa9d1ff79 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyi +++ b/python/pylibcudf/pylibcudf/io/types.pyi @@ -64,6 +64,8 @@ class PartitionInfo: class TableInputMetadata: def __init__(self, table: Table): ... + @property + def column_metadata(self) -> list[ColumnInMetadata]: ... class ColumnInMetadata: def set_name(self, name: str) -> Self: ... diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index a2155829f2c..458595ca0e0 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -288,12 +288,14 @@ cdef class TableInputMetadata: """ def __init__(self, Table table): self.c_obj = table_input_metadata(table.view()) - self.column_metadata = [ + + @property + def column_metadata(self): + return [ ColumnInMetadata.from_libcudf(&self.c_obj.column_metadata[i], self) for i in range(self.c_obj.column_metadata.size()) ] - cdef class TableWithMetadata: """A container holding a table and its associated metadata (e.g. column names) From cbeefd8f4e4e67f52331131039533ef1f0ea0a65 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:54:54 -0500 Subject: [PATCH 2/7] Add Parquet Reader options classes to pylibcudf (#17464) Follow up of #17263, this PR adds the parquet reader options classes to pylibcudf and plumbs the changes through cudf python. Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Matthew Roeschke (https://github.com/mroeschke) - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) - MithunR (https://github.com/mythrocks) URL: https://github.com/rapidsai/cudf/pull/17464 --- cpp/include/cudf/io/parquet.hpp | 1 + python/cudf/cudf/_lib/parquet.pyx | 58 +-- python/cudf_polars/cudf_polars/dsl/ir.py | 44 ++- python/pylibcudf/pylibcudf/io/parquet.pxd | 36 +- python/pylibcudf/pylibcudf/io/parquet.pyi | 21 +- python/pylibcudf/pylibcudf/io/parquet.pyx | 339 +++++++++++------- .../pylibcudf/tests/io/test_parquet.py | 28 +- 7 files changed, 333 insertions(+), 194 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index bfe76d5690c..b561d0989e9 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -410,6 +410,7 @@ class parquet_reader_options_builder { * * @param val Boolean value whether to read matching projected and filter columns from mismatched * Parquet sources. + * * @return this for chaining. */ parquet_reader_options_builder& allow_mismatched_pq_schemas(bool val) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index c77c9875342..1b4c18d13a7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -205,7 +205,7 @@ cdef object _process_metadata(object df, else: start = range_index_meta["start"] + skip_rows stop = range_index_meta["stop"] - if nrows != -1: + if nrows > -1: stop = start + nrows idx = cudf.RangeIndex( start=start, @@ -256,16 +256,27 @@ def read_parquet_chunked( # (see read_parquet) allow_range_index = columns is not None and len(columns) != 0 + options = ( + plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo(filepaths_or_buffers) + ) + .use_pandas_metadata(use_pandas_metadata) + .allow_mismatched_pq_schemas(allow_mismatched_pq_schemas) + .build() + ) + if row_groups is not None: + options.set_row_groups(row_groups) + if nrows > -1: + options.set_num_rows(nrows) + if skip_rows != 0: + options.set_skip_rows(skip_rows) + if columns is not None: + options.set_columns(columns) + reader = ChunkedParquetReader( - plc.io.SourceInfo(filepaths_or_buffers), - columns, - row_groups, - use_pandas_metadata=use_pandas_metadata, + options, chunk_read_limit=chunk_read_limit, pass_read_limit=pass_read_limit, - skip_rows=skip_rows, - nrows=nrows, - allow_mismatched_pq_schemas=allow_mismatched_pq_schemas, ) tbl_w_meta = reader.read_chunk() @@ -325,19 +336,26 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if columns is not None and len(columns) == 0 or filters: allow_range_index = False - # Read Parquet - - tbl_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo(filepaths_or_buffers), - columns, - row_groups, - filters, - convert_strings_to_categories = False, - use_pandas_metadata = use_pandas_metadata, - skip_rows = skip_rows, - nrows = nrows, - allow_mismatched_pq_schemas=allow_mismatched_pq_schemas, + options = ( + plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo(filepaths_or_buffers) + ) + .use_pandas_metadata(use_pandas_metadata) + .allow_mismatched_pq_schemas(allow_mismatched_pq_schemas) + .build() ) + if row_groups is not None: + options.set_row_groups(row_groups) + if nrows > -1: + options.set_num_rows(nrows) + if skip_rows != 0: + options.set_skip_rows(skip_rows) + if columns is not None: + options.set_columns(columns) + if filters is not None: + options.set_filter(filters) + + tbl_w_meta = plc.io.parquet.read_parquet(options) df = cudf.DataFrame._from_data( *data_from_pylibcudf_io(tbl_w_meta) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 1faa778ccf6..b5af3bb80bf 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -517,17 +517,22 @@ def do_evaluate( elif typ == "parquet": parquet_options = config_options.get("parquet_options", {}) if parquet_options.get("chunked", True): + options = plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo(paths) + ).build() + # We handle skip_rows != 0 by reading from the + # up to n_rows + skip_rows and slicing off the + # first skip_rows entries. + # TODO: Remove this workaround once + # https://github.com/rapidsai/cudf/issues/16186 + # is fixed + nrows = n_rows + skip_rows + if nrows > -1: + options.set_num_rows(nrows) + if with_columns is not None: + options.set_columns(with_columns) reader = plc.io.parquet.ChunkedParquetReader( - plc.io.SourceInfo(paths), - columns=with_columns, - # We handle skip_rows != 0 by reading from the - # up to n_rows + skip_rows and slicing off the - # first skip_rows entries. - # TODO: Remove this workaround once - # https://github.com/rapidsai/cudf/issues/16186 - # is fixed - nrows=n_rows + skip_rows, - skip_rows=0, + options, chunk_read_limit=parquet_options.get( "chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE ), @@ -573,13 +578,18 @@ def slice_skip(tbl: plc.Table): if predicate is not None and row_index is None: # Can't apply filters during read if we have a row index. filters = to_parquet_filter(predicate.value) - tbl_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo(paths), - columns=with_columns, - filters=filters, - nrows=n_rows, - skip_rows=skip_rows, - ) + options = plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo(paths) + ).build() + if n_rows != -1: + options.set_num_rows(n_rows) + if skip_rows != 0: + options.set_skip_rows(skip_rows) + if with_columns is not None: + options.set_columns(with_columns) + if filters is not None: + options.set_filter(filters) + tbl_w_meta = plc.io.parquet.read_parquet(options) df = DataFrame.from_table( tbl_w_meta.tbl, # TODO: consider nested column names? diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 7bd6ba91ca9..84f47cf5305 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -19,6 +19,8 @@ from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, parquet_writer_options, parquet_writer_options_builder, + parquet_reader_options, + parquet_reader_options_builder, chunked_parquet_writer_options, chunked_parquet_writer_options_builder, ) @@ -27,6 +29,25 @@ from pylibcudf.table cimport Table from pylibcudf.types cimport DataType +cdef class ParquetReaderOptions: + cdef parquet_reader_options c_obj + cdef SourceInfo source + cpdef void set_row_groups(self, list row_groups) + cpdef void set_num_rows(self, size_type nrows) + cpdef void set_skip_rows(self, int64_t skip_rows) + cpdef void set_columns(self, list col_names) + cpdef void set_filter(self, Expression filter) + +cdef class ParquetReaderOptionsBuilder: + cdef parquet_reader_options_builder c_obj + cdef SourceInfo source + cpdef ParquetReaderOptionsBuilder convert_strings_to_categories(self, bool val) + cpdef ParquetReaderOptionsBuilder use_pandas_metadata(self, bool val) + cpdef ParquetReaderOptionsBuilder allow_mismatched_pq_schemas(self, bool val) + cpdef ParquetReaderOptionsBuilder use_arrow_schema(self, bool val) + cpdef build(self) + + cdef class ChunkedParquetReader: cdef unique_ptr[cpp_chunked_parquet_reader] reader @@ -34,20 +55,7 @@ cdef class ChunkedParquetReader: cpdef TableWithMetadata read_chunk(self) -cpdef read_parquet( - SourceInfo source_info, - list columns = *, - list row_groups = *, - Expression filters = *, - bool convert_strings_to_categories = *, - bool use_pandas_metadata = *, - int64_t skip_rows = *, - size_type nrows = *, - bool allow_mismatched_pq_schemas = *, - # disabled see comment in parquet.pyx for more - # ReaderColumnSchema reader_column_schema = *, - # DataType timestamp_type = * -) +cpdef read_parquet(ParquetReaderOptions options) cdef class ParquetChunkedWriter: diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index 22bea1abd8e..2d8d12c1a45 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -1,7 +1,8 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from collections.abc import Mapping -from typing import Self + +from typing_extensions import Self from pylibcudf.expressions import Expression from pylibcudf.io.types import ( @@ -16,6 +17,24 @@ from pylibcudf.io.types import ( ) from pylibcudf.table import Table +class ParquetReaderOptions: + def __init__(self): ... + def set_row_groups(self, row_groups: list[list[int]]): ... + def set_num_rows(self, nrows: int): ... + def set_skip_rows(self, skip_rows: int): ... + def set_columns(self, col_names: list[str]): ... + def set_filter(self, filter: Expression): ... + @staticmethod + def builder(source: SourceInfo) -> ParquetReaderOptionsBuilder: ... + +class ParquetReaderOptionsBuilder: + def __init__(self): ... + def convert_strings_to_categories(self, val: bool) -> Self: ... + def use_pandas_metadata(self, val: bool) -> Self: ... + def allow_mismatched_pq_schemas(self, val: bool) -> Self: ... + def use_arrow_schema(self, val: bool) -> Self: ... + def build(self) -> ParquetReaderOptions: ... + class ChunkedParquetReader: def __init__( self, diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 9bdf849a30c..672fe2be847 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -42,47 +42,204 @@ __all__ = [ "ParquetWriterOptionsBuilder", "read_parquet", "write_parquet", + "ParquetReaderOptions", + "ParquetReaderOptionsBuilder", "ChunkedParquetWriterOptions", "ChunkedParquetWriterOptionsBuilder" "merge_row_group_metadata", ] -cdef parquet_reader_options _setup_parquet_reader_options( - SourceInfo source_info, - list columns = None, - list row_groups = None, - Expression filters = None, - bool convert_strings_to_categories = False, - bool use_pandas_metadata = True, - int64_t skip_rows = 0, - size_type nrows = -1, - bool allow_mismatched_pq_schemas=False, - # ReaderColumnSchema reader_column_schema = None, - # DataType timestamp_type = DataType(type_id.EMPTY) -): - cdef vector[string] col_vec - cdef parquet_reader_options opts = ( - parquet_reader_options.builder(source_info.c_obj) - .convert_strings_to_categories(convert_strings_to_categories) - .use_pandas_metadata(use_pandas_metadata) - .allow_mismatched_pq_schemas(allow_mismatched_pq_schemas) - .use_arrow_schema(True) - .build() - ) - if row_groups is not None: - opts.set_row_groups(row_groups) - if nrows != -1: - opts.set_num_rows(nrows) - if skip_rows != 0: - opts.set_skip_rows(skip_rows) - if columns is not None: - col_vec.reserve(len(columns)) - for col in columns: - col_vec.push_back(str(col).encode()) - opts.set_columns(col_vec) - if filters is not None: - opts.set_filter(dereference(filters.c_obj.get())) - return opts + +cdef class ParquetReaderOptions: + """The settings to use for ``read_parquet`` + For details, see :cpp:class:`cudf::io::parquet_reader_options` + """ + @staticmethod + def builder(SourceInfo source): + """ + Create a ParquetReaderOptionsBuilder object + + For details, see :cpp:func:`cudf::io::parquet_reader_options::builder` + + Parameters + ---------- + sink : SourceInfo + The source to read the Parquet file from. + + Returns + ------- + ParquetReaderOptionsBuilder + Builder to build ParquetReaderOptions + """ + cdef ParquetReaderOptionsBuilder parquet_builder = ( + ParquetReaderOptionsBuilder.__new__(ParquetReaderOptionsBuilder) + ) + parquet_builder.c_obj = parquet_reader_options.builder(source.c_obj) + parquet_builder.source = source + return parquet_builder + + cpdef void set_row_groups(self, list row_groups): + """ + Sets list of individual row groups to read. + + Parameters + ---------- + row_groups : list + List of row groups to read + + Returns + ------- + None + """ + cdef vector[vector[size_type]] outer + cdef vector[size_type] inner + for row_group in row_groups: + for x in row_group: + inner.push_back(x) + outer.push_back(inner) + inner.clear() + + self.c_obj.set_row_groups(outer) + + cpdef void set_num_rows(self, size_type nrows): + """ + Sets number of rows to read. + + Parameters + ---------- + nrows : size_type + Number of rows to read after skip + + Returns + ------- + None + """ + self.c_obj.set_num_rows(nrows) + + cpdef void set_skip_rows(self, int64_t skip_rows): + """ + Sets number of rows to skip. + + Parameters + ---------- + skip_rows : int64_t + Number of rows to skip from start + + Returns + ------- + None + """ + self.c_obj.set_skip_rows(skip_rows) + + cpdef void set_columns(self, list col_names): + """ + Sets names of the columns to be read. + + Parameters + ---------- + col_names : list + List of column names + + Returns + ------- + None + """ + cdef vector[string] vec + for name in col_names: + vec.push_back(str(name).encode()) + self.c_obj.set_columns(vec) + + cpdef void set_filter(self, Expression filter): + """ + Sets AST based filter for predicate pushdown. + + Parameters + ---------- + filter : Expression + AST expression to use as filter + + Returns + ------- + None + """ + self.c_obj.set_filter(dereference(filter.c_obj.get())) + + +cdef class ParquetReaderOptionsBuilder: + cpdef ParquetReaderOptionsBuilder convert_strings_to_categories(self, bool val): + """ + Sets enable/disable conversion of strings to categories. + + Parameters + ---------- + val : bool + Boolean value to enable/disable conversion of string columns to categories + + Returns + ------- + ParquetReaderOptionsBuilder + """ + self.c_obj.convert_strings_to_categories(val) + return self + + cpdef ParquetReaderOptionsBuilder use_pandas_metadata(self, bool val): + """ + Sets to enable/disable use of pandas metadata to read. + + Parameters + ---------- + val : bool + Boolean value whether to use pandas metadata + + Returns + ------- + ParquetReaderOptionsBuilder + """ + self.c_obj.use_pandas_metadata(val) + return self + + cpdef ParquetReaderOptionsBuilder allow_mismatched_pq_schemas(self, bool val): + """ + Sets to enable/disable reading of matching projected and filter + columns from mismatched Parquet sources. + + Parameters + ---------- + val : bool + Boolean value whether to read matching projected and filter + columns from mismatched Parquet sources. + + Returns + ------- + ParquetReaderOptionsBuilder + """ + self.c_obj.allow_mismatched_pq_schemas(val) + return self + + cpdef ParquetReaderOptionsBuilder use_arrow_schema(self, bool val): + """ + Sets to enable/disable use of arrow schema to read. + + Parameters + ---------- + val : bool + Boolean value whether to use arrow schema + + Returns + ------- + ParquetReaderOptionsBuilder + """ + self.c_obj.use_arrow_schema(val) + return self + + cpdef build(self): + """Create a ParquetReaderOptions object""" + cdef ParquetReaderOptions parquet_options = ParquetReaderOptions.__new__( + ParquetReaderOptions + ) + parquet_options.c_obj = move(self.c_obj.build()) + parquet_options.source = self.source + return parquet_options cdef class ChunkedParquetReader: @@ -93,63 +250,27 @@ cdef class ChunkedParquetReader: Parameters ---------- - source_info : SourceInfo - The SourceInfo object to read the Parquet file from. - columns : list, default None - The names of the columns to be read - row_groups : list[list[size_type]], default None - List of row groups to be read. - use_pandas_metadata : bool, default True - If True, return metadata about the index column in - the per-file user metadata of the ``TableWithMetadata`` - convert_strings_to_categories : bool, default False - Whether to convert string columns to the category type - skip_rows : int64_t, default 0 - The number of rows to skip from the start of the file. - nrows : size_type, default -1 - The number of rows to read. By default, read the entire file. + options : ParquetReaderOptions + Settings for controlling reading behavior chunk_read_limit : size_t, default 0 Limit on total number of bytes to be returned per read, or 0 if there is no limit. pass_read_limit : size_t, default 1024000000 Limit on the amount of memory used for reading and decompressing data or 0 if there is no limit. - allow_mismatched_pq_schemas : bool, default False - Whether to read (matching) columns specified in `columns` from - the input files with otherwise mismatched schemas. """ def __init__( self, - SourceInfo source_info, - list columns=None, - list row_groups=None, - bool use_pandas_metadata=True, - bool convert_strings_to_categories=False, - int64_t skip_rows = 0, - size_type nrows = -1, + ParquetReaderOptions options, size_t chunk_read_limit=0, size_t pass_read_limit=1024000000, - bool allow_mismatched_pq_schemas=False ): - - cdef parquet_reader_options opts = _setup_parquet_reader_options( - source_info, - columns, - row_groups, - filters=None, - convert_strings_to_categories=convert_strings_to_categories, - use_pandas_metadata=use_pandas_metadata, - skip_rows=skip_rows, - nrows=nrows, - allow_mismatched_pq_schemas=allow_mismatched_pq_schemas, - ) - with nogil: self.reader.reset( new cpp_chunked_parquet_reader( chunk_read_limit, pass_read_limit, - opts + options.c_obj, ) ) @@ -184,69 +305,23 @@ cdef class ChunkedParquetReader: return TableWithMetadata.from_libcudf(c_result) -cpdef read_parquet( - SourceInfo source_info, - list columns = None, - list row_groups = None, - Expression filters = None, - bool convert_strings_to_categories = False, - bool use_pandas_metadata = True, - int64_t skip_rows = 0, - size_type nrows = -1, - bool allow_mismatched_pq_schemas = False, - # Disabled, these aren't used by cudf-python - # we should only add them back in if there's user demand - # ReaderColumnSchema reader_column_schema = None, - # DataType timestamp_type = DataType(type_id.EMPTY) -): - """Reads an Parquet file into a :py:class:`~.types.TableWithMetadata`. + +cpdef read_parquet(ParquetReaderOptions options): + """ + Read from Parquet format. + + The source to read from and options are encapsulated + by the `options` object. For details, see :cpp:func:`read_parquet`. Parameters ---------- - source_info : SourceInfo - The SourceInfo object to read the Parquet file from. - columns : list, default None - The string names of the columns to be read. - row_groups : list[list[size_type]], default None - List of row groups to be read. - filters : Expression, default None - An AST :py:class:`pylibcudf.expressions.Expression` - to use for predicate pushdown. - convert_strings_to_categories : bool, default False - Whether to convert string columns to the category type - use_pandas_metadata : bool, default True - If True, return metadata about the index column in - the per-file user metadata of the ``TableWithMetadata`` - skip_rows : int64_t, default 0 - The number of rows to skip from the start of the file. - nrows : size_type, default -1 - The number of rows to read. By default, read the entire file. - allow_mismatched_pq_schemas : bool, default False - If True, enable reading (matching) columns specified in `columns` - from the input files with otherwise mismatched schemas. - - Returns - ------- - TableWithMetadata - The Table and its corresponding metadata (column names) that were read in. + options: ParquetReaderOptions + Settings for controlling reading behavior """ - cdef table_with_metadata c_result - cdef parquet_reader_options opts = _setup_parquet_reader_options( - source_info, - columns, - row_groups, - filters, - convert_strings_to_categories, - use_pandas_metadata, - skip_rows, - nrows, - allow_mismatched_pq_schemas, - ) - with nogil: - c_result = move(cpp_read_parquet(opts)) + c_result = move(cpp_read_parquet(options.c_obj)) return TableWithMetadata.from_libcudf(c_result) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index 94524acbcc8..da535809745 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -31,19 +31,24 @@ def test_read_parquet_basic( binary_source_or_sink, pa_table, **_COMMON_PARQUET_SOURCE_KWARGS ) - res = plc.io.parquet.read_parquet( - plc.io.SourceInfo([source]), - nrows=nrows, - skip_rows=skiprows, - columns=columns, - ) + options = plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo([source]) + ).build() + if nrows > -1: + options.set_num_rows(nrows) + if skiprows != 0: + options.set_skip_rows(skiprows) + if columns is not None: + options.set_columns(columns) + + res = plc.io.parquet.read_parquet(options) if columns is not None: pa_table = pa_table.select(columns) # Adapt to nrows/skiprows pa_table = pa_table.slice( - offset=skiprows, length=nrows if nrows != -1 else None + offset=skiprows, length=nrows if nrows > -1 else None ) assert_table_and_meta_eq(pa_table, res, check_field_nullability=False) @@ -95,9 +100,12 @@ def test_read_parquet_filters( binary_source_or_sink, pa_table, **_COMMON_PARQUET_SOURCE_KWARGS ) - plc_table_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo([source]), filters=plc_filters - ) + options = plc.io.parquet.ParquetReaderOptions.builder( + plc.io.SourceInfo([source]) + ).build() + options.set_filter(plc_filters) + + plc_table_w_meta = plc.io.parquet.read_parquet(options) exp = read_table(source, filters=pa_filters) assert_table_and_meta_eq( exp, plc_table_w_meta, check_field_nullability=False From 14b4321b5172104c5d9801e196e607e3bb0c4c39 Mon Sep 17 00:00:00 2001 From: Karthikeyan <6488848+karthikeyann@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:27:03 -0600 Subject: [PATCH 3/7] Fix all null list column with missing child column in JSON reader (#17348) Authors: - Karthikeyan (https://github.com/karthikeyann) Approvers: - Nghia Truong (https://github.com/ttnghia) - Basit Ayantunde (https://github.com/lamarrr) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/17348 --- cpp/src/io/json/host_tree_algorithms.cu | 126 ++++++++++++++------- cpp/src/io/json/json_column.cu | 67 ++++++------ cpp/src/io/json/nested_json.hpp | 12 ++ cpp/src/io/json/parser_features.cpp | 58 +++++++--- cpp/tests/io/json/json_test.cpp | 140 ++++++++++++++++++++++++ 5 files changed, 317 insertions(+), 86 deletions(-) diff --git a/cpp/src/io/json/host_tree_algorithms.cu b/cpp/src/io/json/host_tree_algorithms.cu index 7fafa885c66..7b9fc25d1cc 100644 --- a/cpp/src/io/json/host_tree_algorithms.cu +++ b/cpp/src/io/json/host_tree_algorithms.cu @@ -222,18 +222,19 @@ struct json_column_data { using hashmap_of_device_columns = std::unordered_map>; -std::pair, hashmap_of_device_columns> build_tree( - device_json_column& root, - host_span is_str_column_all_nulls, - tree_meta_t& d_column_tree, - device_span d_unique_col_ids, - device_span d_max_row_offsets, - std::vector const& column_names, - NodeIndexT row_array_parent_col_id, - bool is_array_of_arrays, - cudf::io::json_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +std:: + tuple, cudf::detail::host_vector, hashmap_of_device_columns> + build_tree(device_json_column& root, + host_span is_str_column_all_nulls, + tree_meta_t& d_column_tree, + device_span d_unique_col_ids, + device_span d_max_row_offsets, + std::vector const& column_names, + NodeIndexT row_array_parent_col_id, + bool is_array_of_arrays, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); void scatter_offsets(tree_meta_t const& tree, device_span col_ids, @@ -242,6 +243,7 @@ void scatter_offsets(tree_meta_t const& tree, device_span sorted_col_ids, // Reuse this for parent_col_ids tree_meta_t const& d_column_tree, host_span ignore_vals, + host_span is_mixed, hashmap_of_device_columns const& columns, rmm::cuda_stream_view stream); @@ -363,17 +365,17 @@ void make_device_json_column(device_span input, } return std::vector(); }(); - auto const [ignore_vals, columns] = build_tree(root, - is_str_column_all_nulls, - d_column_tree, - d_unique_col_ids, - d_max_row_offsets, - column_names, - row_array_parent_col_id, - is_array_of_arrays, - options, - stream, - mr); + auto const [ignore_vals, is_mixed_pruned, columns] = build_tree(root, + is_str_column_all_nulls, + d_column_tree, + d_unique_col_ids, + d_max_row_offsets, + column_names, + row_array_parent_col_id, + is_array_of_arrays, + options, + stream, + mr); if (ignore_vals.empty()) return; scatter_offsets(tree, col_ids, @@ -382,22 +384,24 @@ void make_device_json_column(device_span input, sorted_col_ids, d_column_tree, ignore_vals, + is_mixed_pruned, columns, stream); } -std::pair, hashmap_of_device_columns> build_tree( - device_json_column& root, - host_span is_str_column_all_nulls, - tree_meta_t& d_column_tree, - device_span d_unique_col_ids, - device_span d_max_row_offsets, - std::vector const& column_names, - NodeIndexT row_array_parent_col_id, - bool is_array_of_arrays, - cudf::io::json_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std:: + tuple, cudf::detail::host_vector, hashmap_of_device_columns> + build_tree(device_json_column& root, + host_span is_str_column_all_nulls, + tree_meta_t& d_column_tree, + device_span d_unique_col_ids, + device_span d_max_row_offsets, + std::vector const& column_names, + NodeIndexT row_array_parent_col_id, + bool is_array_of_arrays, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { bool const is_enabled_lines = options.is_enabled_lines(); bool const is_enabled_mixed_types_as_string = options.is_enabled_mixed_types_as_string(); @@ -488,7 +492,9 @@ std::pair, hashmap_of_device_columns> build_tree // NoPruning: iterate through schema and enforce type. if (adj[parent_node_sentinel].empty()) - return {cudf::detail::make_host_vector(0, stream), {}}; // for empty file + return {cudf::detail::make_host_vector(0, stream), + cudf::detail::make_host_vector(0, stream), + {}}; // for empty file CUDF_EXPECTS(adj[parent_node_sentinel].size() == 1, "Should be 1"); auto expected_types = cudf::detail::make_host_vector(num_columns, stream); std::fill_n(expected_types.begin(), num_columns, NUM_NODE_CLASSES); @@ -551,11 +557,14 @@ std::pair, hashmap_of_device_columns> build_tree auto list_child = schema.child_types.at(this_list_child_name); for (auto const& child_id : child_ids) mark_is_pruned(child_id, list_child); + // TODO: Store null map of non-target types for list children to mark list entry as null. } }; if (is_array_of_arrays) { if (adj[adj[parent_node_sentinel][0]].empty()) - return {cudf::detail::make_host_vector(0, stream), {}}; + return {cudf::detail::make_host_vector(0, stream), + cudf::detail::make_host_vector(0, stream), + {}}; auto root_list_col_id = is_enabled_lines ? adj[parent_node_sentinel][0] : adj[adj[parent_node_sentinel][0]][0]; // mark root and row array col_id as not pruned. @@ -647,8 +656,12 @@ std::pair, hashmap_of_device_columns> build_tree ? adj[parent_node_sentinel][0] : (adj[adj[parent_node_sentinel][0]].empty() ? -1 : adj[adj[parent_node_sentinel][0]][0]); + // List children which are pruned mixed types, nullify parent list row. + auto is_mixed_pruned = cudf::detail::make_host_vector(num_columns, stream); + std::fill_n(is_mixed_pruned.begin(), num_columns, false); auto handle_mixed_types = [&column_categories, &is_str_column_all_nulls, + &is_mixed_pruned, &is_pruned, &expected_types, &is_enabled_mixed_types_as_string, @@ -794,6 +807,14 @@ std::pair, hashmap_of_device_columns> build_tree "list child column insertion failed, duplicate column name in the parent"); ref.get().column_order.emplace_back(list_child_name); auto this_ref = std::ref(ref.get().child_columns.at(list_child_name)); + if (options.is_enabled_experimental()) { + for (auto const& child_id : child_ids) { + if (is_pruned[child_id]) { + // store this child_id for mixed_type nullify parent list_id. + is_mixed_pruned[child_id] = is_pruned[child_id]; + } + } + } // Mixed type handling handle_mixed_types(child_ids); if (child_ids.empty()) { @@ -829,7 +850,7 @@ std::pair, hashmap_of_device_columns> build_tree [](auto exp, auto cat) { return exp == NUM_NODE_CLASSES ? cat : exp; }); cudf::detail::cuda_memcpy_async(d_column_tree.node_categories, expected_types, stream); - return {is_pruned, columns}; + return {is_pruned, is_mixed_pruned, columns}; } void scatter_offsets(tree_meta_t const& tree, @@ -839,6 +860,7 @@ void scatter_offsets(tree_meta_t const& tree, device_span sorted_col_ids, // Reuse this for parent_col_ids tree_meta_t const& d_column_tree, host_span ignore_vals, + host_span is_mixed_pruned, hashmap_of_device_columns const& columns, rmm::cuda_stream_view stream) { @@ -857,6 +879,8 @@ void scatter_offsets(tree_meta_t const& tree, auto d_ignore_vals = cudf::detail::make_device_uvector_async( ignore_vals, stream, cudf::get_current_device_resource_ref()); + auto d_is_mixed_pruned = cudf::detail::make_device_uvector_async( + is_mixed_pruned, stream, cudf::get_current_device_resource_ref()); auto d_columns_data = cudf::detail::make_device_uvector_async( columns_data, stream, cudf::get_current_device_resource_ref()); @@ -921,9 +945,31 @@ void scatter_offsets(tree_meta_t const& tree, column_categories[col_ids[parent_node_id]] == NC_LIST and (!d_ignore_vals[col_ids[parent_node_id]]); }); + // For children of list and in ignore_vals, find it's parent node id, and set corresponding + // parent's null mask to null. Setting mixed type list rows to null. + auto const num_list_children = thrust::distance( + thrust::make_zip_iterator(node_ids.begin(), parent_col_ids.begin()), list_children_end); + thrust::for_each_n( + rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(0), + num_list_children, + [node_ids = node_ids.begin(), + parent_node_ids = tree.parent_node_ids.begin(), + column_categories = d_column_tree.node_categories.begin(), + col_ids = col_ids.begin(), + row_offsets = row_offsets.begin(), + d_is_mixed_pruned = d_is_mixed_pruned.begin(), + d_ignore_vals = d_ignore_vals.begin(), + d_columns_data = d_columns_data.begin()] __device__(size_type i) { + auto const node_id = node_ids[i]; + auto const parent_node_id = parent_node_ids[node_id]; + if (parent_node_id == parent_node_sentinel or d_ignore_vals[col_ids[parent_node_id]]) return; + if (column_categories[col_ids[parent_node_id]] == NC_LIST and + d_is_mixed_pruned[col_ids[node_id]]) { + clear_bit(d_columns_data[col_ids[parent_node_id]].validity, row_offsets[parent_node_id]); + } + }); - auto const num_list_children = - list_children_end - thrust::make_zip_iterator(node_ids.begin(), parent_col_ids.begin()); thrust::stable_sort_by_key(rmm::exec_policy_nosync(stream), parent_col_ids.begin(), parent_col_ids.begin() + num_list_children, diff --git a/cpp/src/io/json/json_column.cu b/cpp/src/io/json/json_column.cu index 30a154fdda2..1fe58a0449f 100644 --- a/cpp/src/io/json/json_column.cu +++ b/cpp/src/io/json/json_column.cu @@ -464,46 +464,49 @@ std::pair, std::vector> device_json_co column_names.emplace_back( json_col.child_columns.empty() ? list_child_name : json_col.child_columns.begin()->first); - // Note: json_col modified here, reuse the memory + // If child is not present, set the null mask correctly, but offsets are zero, and children + // are empty. Note: json_col modified here, reuse the memory auto offsets_column = std::make_unique(data_type{type_id::INT32}, num_rows + 1, json_col.child_offsets.release(), rmm::device_buffer{}, 0); // Create children column - auto child_schema_element = - json_col.child_columns.empty() ? std::optional{} : get_list_child_schema(); - auto [child_column, names] = - json_col.child_columns.empty() or (prune_columns and !child_schema_element.has_value()) - ? std::pair, - // EMPTY type could not used because gather throws exception on EMPTY type. - std::vector>{std::make_unique( - data_type{type_id::INT8}, - 0, - rmm::device_buffer{}, - rmm::device_buffer{}, - 0), - std::vector{}} - : device_json_column_to_cudf_column(json_col.child_columns.begin()->second, - d_input, - options, - prune_columns, - child_schema_element, - stream, - mr); + auto child_schema_element = get_list_child_schema(); + auto [child_column, names] = [&]() { + if (json_col.child_columns.empty()) { + // EMPTY type could not used because gather throws exception on EMPTY type. + auto empty_col = make_empty_column( + child_schema_element.value_or(schema_element{data_type{type_id::INT8}}), stream, mr); + auto children_metadata = std::vector{ + make_column_name_info( + child_schema_element.value_or(schema_element{data_type{type_id::INT8}}), + list_child_name) + .children}; + + return std::pair, std::vector>{ + std::move(empty_col), children_metadata}; + } + return device_json_column_to_cudf_column(json_col.child_columns.begin()->second, + d_input, + options, + prune_columns, + child_schema_element, + stream, + mr); + }(); column_names.back().children = names; auto [result_bitmask, null_count] = make_validity(json_col); - auto ret_col = make_lists_column(num_rows, - std::move(offsets_column), - std::move(child_column), - 0, - rmm::device_buffer{0, stream, mr}, - stream, - mr); - // The null_mask is set after creation of list column is to skip the purge_nonempty_nulls and - // null validation applied in make_lists_column factory, which is not needed for json - // parent column cannot be null when its children is non-empty in JSON - if (null_count != 0) { ret_col->set_null_mask(std::move(result_bitmask), null_count); } + auto ret_col = make_lists_column( + num_rows, + std::move(offsets_column), + std::move(child_column), + null_count, + null_count == 0 ? rmm::device_buffer{0, stream, mr} : std::move(result_bitmask), + stream, + mr); + // Since some rows in child column may need to be nullified due to mixed types, we can not + // skip the purge_nonempty_nulls call in make_lists_column factory return {std::move(ret_col), std::move(column_names)}; } default: CUDF_FAIL("Unsupported column type"); break; diff --git a/cpp/src/io/json/nested_json.hpp b/cpp/src/io/json/nested_json.hpp index 4989fff4b30..2f6942fe139 100644 --- a/cpp/src/io/json/nested_json.hpp +++ b/cpp/src/io/json/nested_json.hpp @@ -429,6 +429,18 @@ table_with_metadata device_parse_nested_json(device_span input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +/** + * @brief Create empty column of a given nested schema + * + * @param schema The schema of the column to create + * @param stream The CUDA stream to which kernels are dispatched + * @param mr resource with which to allocate + * @return The empty column + */ +std::unique_ptr make_empty_column(schema_element const& schema, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + /** * @brief Create all null column of a given nested schema * diff --git a/cpp/src/io/json/parser_features.cpp b/cpp/src/io/json/parser_features.cpp index ced7acb9cde..2da320b2af3 100644 --- a/cpp/src/io/json/parser_features.cpp +++ b/cpp/src/io/json/parser_features.cpp @@ -159,7 +159,17 @@ struct empty_column_functor { std::unique_ptr child = cudf::type_dispatcher( schema.child_types.at(child_name).type, *this, schema.child_types.at(child_name)); auto offsets = make_empty_column(data_type(type_to_id())); - return make_lists_column(0, std::move(offsets), std::move(child), 0, {}, stream, mr); + std::vector> child_columns; + child_columns.push_back(std::move(offsets)); + child_columns.push_back(std::move(child)); + // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` on + // the child column as it does not have non-empty nulls. Look issue #17356 + return std::make_unique(cudf::data_type{type_id::LIST}, + 0, + rmm::device_buffer{}, + rmm::device_buffer{}, + 0, + std::move(child_columns)); } template )> @@ -174,6 +184,13 @@ struct empty_column_functor { } }; +std::unique_ptr make_empty_column(schema_element const& schema, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return cudf::type_dispatcher(schema.type, empty_column_functor{stream, mr}, schema); +} + /// Created all null column of the specified schema struct allnull_column_functor { rmm::cuda_stream_view stream; @@ -198,10 +215,9 @@ struct allnull_column_functor { std::unique_ptr operator()(schema_element const& schema, size_type size) const { CUDF_EXPECTS(schema.child_types.size() == 1, "Dictionary column should have only one child"); - auto const& child_name = schema.child_types.begin()->first; - std::unique_ptr child = cudf::type_dispatcher(schema.child_types.at(child_name).type, - empty_column_functor{stream, mr}, - schema.child_types.at(child_name)); + auto const& child_name = schema.child_types.begin()->first; + std::unique_ptr child = + make_empty_column(schema.child_types.at(child_name), stream, mr); return make_fixed_width_column(schema.type, size, mask_state::ALL_NULL, stream, mr); auto indices = make_zeroed_offsets(size - 1); auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr); @@ -221,14 +237,22 @@ struct allnull_column_functor { std::unique_ptr operator()(schema_element const& schema, size_type size) const { CUDF_EXPECTS(schema.child_types.size() == 1, "List column should have only one child"); - auto const& child_name = schema.child_types.begin()->first; - std::unique_ptr child = cudf::type_dispatcher(schema.child_types.at(child_name).type, - empty_column_functor{stream, mr}, - schema.child_types.at(child_name)); - auto offsets = make_zeroed_offsets(size); + auto const& child_name = schema.child_types.begin()->first; + std::unique_ptr child = + make_empty_column(schema.child_types.at(child_name), stream, mr); + auto offsets = make_zeroed_offsets(size); auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr); - return make_lists_column( - size, std::move(offsets), std::move(child), size, std::move(null_mask), stream, mr); + std::vector> child_columns; + child_columns.push_back(std::move(offsets)); + child_columns.push_back(std::move(child)); + // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` on + // the child column as it does not have non-empty nulls. Look issue #17356 + return std::make_unique(cudf::data_type{type_id::LIST}, + size, + rmm::device_buffer{}, + std::move(null_mask), + size, + std::move(child_columns)); } template )> @@ -240,8 +264,14 @@ struct allnull_column_functor { schema.child_types.at(child_name).type, *this, schema.child_types.at(child_name), size)); } auto null_mask = cudf::detail::create_null_mask(size, mask_state::ALL_NULL, stream, mr); - return make_structs_column( - size, std::move(child_columns), size, std::move(null_mask), stream, mr); + // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` on + // the children columns. Look issue #17356 + return std::make_unique(cudf::data_type{type_id::STRUCT}, + size, + rmm::device_buffer{}, + std::move(null_mask), + size, + std::move(child_columns)); } }; diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 3c8db99c3c7..37a750330fa 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -56,6 +56,8 @@ using int16_wrapper = wrapper; using int64_wrapper = wrapper; using timestamp_ms_wrapper = wrapper; using bool_wrapper = wrapper; +using size_type_wrapper = wrapper; +using strings_wrapper = cudf::test::strings_column_wrapper; using cudf::data_type; using cudf::type_id; @@ -3253,6 +3255,144 @@ TEST_F(JsonReaderTest, JsonNestedDtypeFilterWithOrder) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(2), *wrapped); } } + + // test list (all-null) of struct (empty) of string (empty) + { + std::string json_stringl = R"( + {"a" : [1], "c2": [1, 2]} + {} + )"; + auto lines = true; + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{json_stringl.data(), json_stringl.size()}) + .prune_columns(true) + .experimental(true) + .lines(lines); + + cudf::io::schema_element dtype_schema{ + data_type{cudf::type_id::STRUCT}, + { + {"a", {data_type{cudf::type_id::LIST}, {{"element", {dtype()}}}}}, + {"c2", + {data_type{cudf::type_id::LIST}, + {{"element", + {data_type{cudf::type_id::STRUCT}, + { + {"d", {data_type{cudf::type_id::STRING}}}, + }, + {{"d"}}}}}}}, + }, + {{"a", "c2"}}}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + // Make sure we have column "a":[int64_t] + ASSERT_EQ(result.tbl->num_columns(), 2); + ASSERT_EQ(result.metadata.schema_info.size(), 2); + EXPECT_EQ(result.metadata.schema_info[0].name, "a"); + ASSERT_EQ(result.metadata.schema_info[0].children.size(), 2); + EXPECT_EQ(result.metadata.schema_info[0].children[0].name, "offsets"); + EXPECT_EQ(result.metadata.schema_info[0].children[1].name, "element"); + // Make sure we have all null list "c2": [{"d": ""}] + EXPECT_EQ(result.metadata.schema_info[1].name, "c2"); + ASSERT_EQ(result.metadata.schema_info[1].children.size(), 2); + EXPECT_EQ(result.metadata.schema_info[1].children[0].name, "offsets"); + EXPECT_EQ(result.metadata.schema_info[1].children[1].name, "element"); + ASSERT_EQ(result.metadata.schema_info[1].children[1].children.size(), 1); + EXPECT_EQ(result.metadata.schema_info[1].children[1].children[0].name, "d"); + + auto const expected0 = [&] { + auto const valids = std::vector{1, 0}; + auto [null_mask, null_count] = + cudf::test::detail::make_null_mask(valids.begin(), valids.end()); + return cudf::make_lists_column(2, + size_type_wrapper{0, 1, 1}.release(), + int64_wrapper{1}.release(), + null_count, + std::move(null_mask)); + }(); + + auto const expected1 = [&] { + auto const get_structs = [] { + auto child = cudf::test::strings_column_wrapper{}; + return cudf::test::structs_column_wrapper{{child}}; + }; + auto const valids = std::vector{0, 0}; + auto [null_mask, null_count] = + cudf::test::detail::make_null_mask(valids.begin(), valids.end()); + return cudf::make_lists_column(2, + size_type_wrapper{0, 0, 0}.release(), + get_structs().release(), + null_count, + std::move(null_mask)); + }(); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected0, result.tbl->get_column(0).view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected1, result.tbl->get_column(1).view()); + } +} + +TEST_F(JsonReaderTest, NullifyMixedList) +{ + using namespace cudf::test::iterators; + // test list + std::string json_stringl = R"( + {"c2": []} + {"c2": [{}]} + {"c2": [[]]} + {"c2": [{}, [], {}]} + {"c2": [[123], {"b": "1"}]} + {"c2": [{"x": "y"}, {"b": "1"}]} + {} + )"; + // [], [{null, null}], null, null, null, [{null, null}, {1, null}], null + // valid 1 1 0 0 0 1 0 + // ofset 0, 0, 1, 1, 1, 1, 3, 3 + // child {null, null}, {null, null}, {1, null} + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{json_stringl.data(), json_stringl.size()}) + .prune_columns(true) + .experimental(true) + .lines(true); + + // struct>> eg. {"c2": [{"b": "1", "c": "2"}]} + cudf::io::schema_element dtype_schema{data_type{cudf::type_id::STRUCT}, + { + {"c2", + {data_type{cudf::type_id::LIST}, + {{"element", + {data_type{cudf::type_id::STRUCT}, + { + {"b", {data_type{cudf::type_id::STRING}}}, + {"c", {data_type{cudf::type_id::STRING}}}, + }, + {{"b", "c"}}}}}}}, + }, + {{"c2"}}}; + in_options.set_dtypes(dtype_schema); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + ASSERT_EQ(result.tbl->num_columns(), 1); + ASSERT_EQ(result.metadata.schema_info.size(), 1); + + // Expected: A list of struct of 2-string columns + // [], [{null, null}], null, null, null, [{null, null}, {1, null}], null + auto get_structs = [] { + strings_wrapper child0{{"", "", "1"}, nulls_at({0, 0, 1})}; + strings_wrapper child1{{"", "", ""}, all_nulls()}; + // purge non-empty nulls in list seems to retain nullmask in struct child column + return cudf::test::structs_column_wrapper{{child0, child1}, no_nulls()}.release(); + }; + std::vector const list_nulls{1, 1, 0, 0, 0, 1, 0}; + auto [null_mask, null_count] = + cudf::test::detail::make_null_mask(list_nulls.cbegin(), list_nulls.cend()); + auto const expected = cudf::make_lists_column( + 7, + cudf::test::fixed_width_column_wrapper{0, 0, 1, 1, 1, 1, 3, 3}.release(), + get_structs(), + null_count, + std::move(null_mask)); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, result.tbl->get_column(0).view()); } struct JsonCompressedIOTest : public cudf::test::BaseFixture, From 80fc629aab1cc459b9ff8f0e9fee379a82219815 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Sat, 7 Dec 2024 01:41:33 -0600 Subject: [PATCH 4/7] Update cuda-python lower bounds to 12.6.2 / 11.8.5 (#17547) We require a newer cuda-python lower bound for new features and to use the new layout. This will fix a number of errors observed when the runtime version of cuda-python is older than the version used to build packages using Cython features from cuda-python. See https://github.com/rapidsai/build-planning/issues/117#issuecomment-2524250915 for details. Authors: - Bradley Dice (https://github.com/bdice) Approvers: - James Lamb (https://github.com/jameslamb) URL: https://github.com/rapidsai/cudf/pull/17547 --- conda/environments/all_cuda-118_arch-x86_64.yaml | 2 +- conda/environments/all_cuda-125_arch-x86_64.yaml | 2 +- conda/recipes/cudf/meta.yaml | 4 ++-- conda/recipes/pylibcudf/meta.yaml | 4 ++-- dependencies.yaml | 8 ++++---- python/cudf/pyproject.toml | 2 +- python/pylibcudf/pyproject.toml | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index 87c40421be0..bad508154aa 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -19,7 +19,7 @@ dependencies: - cramjam - cubinlinker - cuda-nvtx=11.8 -- cuda-python>=11.7.1,<12.0a0 +- cuda-python>=11.8.5,<12.0a0 - cuda-sanitizer-api=11.8.86 - cuda-version=11.8 - cudatoolkit diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 0935de96d19..969124a29ad 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -21,7 +21,7 @@ dependencies: - cuda-nvcc - cuda-nvrtc-dev - cuda-nvtx-dev -- cuda-python>=12.0,<13.0a0 +- cuda-python>=12.6.2,<13.0a0 - cuda-sanitizer-api - cuda-version=12.5 - cupy>=12.0.0 diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml index e52b8c5f2a0..2c16deeed82 100644 --- a/conda/recipes/cudf/meta.yaml +++ b/conda/recipes/cudf/meta.yaml @@ -91,7 +91,7 @@ requirements: - cudatoolkit - ptxcompiler >=0.7.0 - cubinlinker # CUDA enhanced compatibility. - - cuda-python >=11.7.1,<12.0a0 + - cuda-python >=11.8.5,<12.0a0 {% else %} - cuda-cudart - libcufile # [linux64] @@ -100,7 +100,7 @@ requirements: # TODO: Add nvjitlink here # xref: https://github.com/rapidsai/cudf/issues/12822 - cuda-nvrtc - - cuda-python >=12.0,<13.0a0 + - cuda-python >=12.6.2,<13.0a0 - pynvjitlink {% endif %} - {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }} diff --git a/conda/recipes/pylibcudf/meta.yaml b/conda/recipes/pylibcudf/meta.yaml index 3d965f30986..08eab363af0 100644 --- a/conda/recipes/pylibcudf/meta.yaml +++ b/conda/recipes/pylibcudf/meta.yaml @@ -83,9 +83,9 @@ requirements: - {{ pin_compatible('rmm', max_pin='x.x') }} - fsspec >=0.6.0 {% if cuda_major == "11" %} - - cuda-python >=11.7.1,<12.0a0 + - cuda-python >=11.8.5,<12.0a0 {% else %} - - cuda-python >=12.0,<13.0a0 + - cuda-python >=12.6.2,<13.0a0 {% endif %} - nvtx >=0.2.1 - packaging diff --git a/dependencies.yaml b/dependencies.yaml index 044c7d187b3..3c55ce2c614 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -679,10 +679,10 @@ dependencies: matrices: - matrix: {cuda: "12.*"} packages: - - cuda-python>=12.0,<13.0a0 + - cuda-python>=12.6.2,<13.0a0 - matrix: {cuda: "11.*"} packages: &run_pylibcudf_packages_all_cu11 - - cuda-python>=11.7.1,<12.0a0 + - cuda-python>=11.8.5,<12.0a0 - {matrix: null, packages: *run_pylibcudf_packages_all_cu11} run_cudf: common: @@ -705,10 +705,10 @@ dependencies: matrices: - matrix: {cuda: "12.*"} packages: - - cuda-python>=12.0,<13.0a0 + - cuda-python>=12.6.2,<13.0a0 - matrix: {cuda: "11.*"} packages: &run_cudf_packages_all_cu11 - - cuda-python>=11.7.1,<12.0a0 + - cuda-python>=11.8.5,<12.0a0 - {matrix: null, packages: *run_cudf_packages_all_cu11} - output_types: conda matrices: diff --git a/python/cudf/pyproject.toml b/python/cudf/pyproject.toml index 80de9056a0a..21c18ef0174 100644 --- a/python/cudf/pyproject.toml +++ b/python/cudf/pyproject.toml @@ -20,7 +20,7 @@ requires-python = ">=3.10" dependencies = [ "cachetools", "cubinlinker", - "cuda-python>=11.7.1,<12.0a0", + "cuda-python>=11.8.5,<12.0a0", "cupy-cuda11x>=12.0.0", "fsspec>=0.6.0", "libcudf==25.2.*,>=0.0.0a0", diff --git a/python/pylibcudf/pyproject.toml b/python/pylibcudf/pyproject.toml index a5e5704b8ed..53ee3e2b56e 100644 --- a/python/pylibcudf/pyproject.toml +++ b/python/pylibcudf/pyproject.toml @@ -18,7 +18,7 @@ authors = [ license = { text = "Apache 2.0" } requires-python = ">=3.10" dependencies = [ - "cuda-python>=11.7.1,<12.0a0", + "cuda-python>=11.8.5,<12.0a0", "libcudf==25.2.*,>=0.0.0a0", "nvtx>=0.2.1", "packaging", From a0fc6a89a596ebae7df436be25aed70ec908f83e Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Mon, 9 Dec 2024 09:33:08 -0500 Subject: [PATCH 5/7] Use cooperative-groups instead of cub warp-reduce for strings contains (#17540) Replaces the `cub::WarpReduce` usage in `cudf::strings::contains` with cooperative-groups `any()`. The change is only for the `contains_warp_parallel` kernel which is used for wider strings. Using cooperative-groups generates more efficient code for the same results and gives an additional 11-14% performance improvement. Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) - Shruti Shivakumar (https://github.com/shrshi) URL: https://github.com/rapidsai/cudf/pull/17540 --- cpp/src/strings/search/find.cu | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/cpp/src/strings/search/find.cu b/cpp/src/strings/search/find.cu index 0f33fcb6fe1..94bc81ec933 100644 --- a/cpp/src/strings/search/find.cu +++ b/cpp/src/strings/search/find.cu @@ -32,6 +32,7 @@ #include #include +#include #include #include #include @@ -347,13 +348,15 @@ CUDF_KERNEL void contains_warp_parallel_fn(column_device_view const d_strings, string_view const d_target, bool* d_results) { - auto const idx = cudf::detail::grid_1d::global_thread_id(); - using warp_reduce = cub::WarpReduce; - __shared__ typename warp_reduce::TempStorage temp_storage; + auto const idx = cudf::detail::grid_1d::global_thread_id(); auto const str_idx = idx / cudf::detail::warp_size; if (str_idx >= d_strings.size()) { return; } - auto const lane_idx = idx % cudf::detail::warp_size; + + namespace cg = cooperative_groups; + auto const warp = cg::tiled_partition(cg::this_thread_block()); + auto const lane_idx = warp.thread_rank(); + if (d_strings.is_null(str_idx)) { return; } // get the string for this warp auto const d_str = d_strings.element(str_idx); @@ -373,7 +376,7 @@ CUDF_KERNEL void contains_warp_parallel_fn(column_device_view const d_strings, } } - auto const result = warp_reduce(temp_storage).Reduce(found, cub::Max()); + auto const result = warp.any(found); if (lane_idx == 0) { d_results[str_idx] = result; } } From 0f5d4b9514b92f69465f4d76b1f9db1c5a37f33a Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:41:26 -0500 Subject: [PATCH 6/7] Remove unused IO utilities from cudf python (#17374) Removes unused IO utilities from cuDF Python. Depends on #17163 #16042 #17252 #17263 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17374 --- python/cudf/cudf/_lib/io/utils.pxd | 6 +-- python/cudf/cudf/_lib/io/utils.pyx | 87 ++---------------------------- 2 files changed, 5 insertions(+), 88 deletions(-) diff --git a/python/cudf/cudf/_lib/io/utils.pxd b/python/cudf/cudf/_lib/io/utils.pxd index 96504ebdd66..9b8bab012e2 100644 --- a/python/cudf/cudf/_lib/io/utils.pxd +++ b/python/cudf/cudf/_lib/io/utils.pxd @@ -13,9 +13,6 @@ from pylibcudf.libcudf.io.types cimport ( from cudf._lib.column cimport Column -cdef sink_info make_sinks_info( - list src, vector[unique_ptr[data_sink]] & data) except* -cdef sink_info make_sink_info(src, unique_ptr[data_sink] & data) except* cdef add_df_col_struct_names( df, child_names_dict @@ -26,7 +23,8 @@ cdef update_col_struct_field_names( ) cdef update_struct_field_names( table, - vector[column_name_info]& schema_info) + vector[column_name_info]& schema_info +) cdef Column update_column_struct_field_names( Column col, column_name_info& info diff --git a/python/cudf/cudf/_lib/io/utils.pyx b/python/cudf/cudf/_lib/io/utils.pyx index f23980b387a..df4675be599 100644 --- a/python/cudf/cudf/_lib/io/utils.pyx +++ b/python/cudf/cudf/_lib/io/utils.pyx @@ -1,97 +1,16 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from cpython.buffer cimport PyBUF_READ -from cpython.memoryview cimport PyMemoryView_FromMemory -from libcpp.memory cimport unique_ptr + from libcpp.string cimport string -from libcpp.utility cimport move + from libcpp.vector cimport vector -from pylibcudf.libcudf.io.data_sink cimport data_sink -from pylibcudf.libcudf.io.types cimport ( - column_name_info, - sink_info, -) +from pylibcudf.libcudf.io.types cimport column_name_info from cudf._lib.column cimport Column -import codecs -import io -import os - from cudf.core.dtypes import StructDtype -# Converts the Python sink input to libcudf IO sink_info. -cdef sink_info make_sinks_info( - list src, vector[unique_ptr[data_sink]] & sink -) except*: - cdef vector[data_sink *] data_sinks - cdef vector[string] paths - if isinstance(src[0], io.StringIO): - data_sinks.reserve(len(src)) - for s in src: - sink.push_back(unique_ptr[data_sink](new iobase_data_sink(s))) - data_sinks.push_back(sink.back().get()) - return sink_info(data_sinks) - elif isinstance(src[0], io.TextIOBase): - data_sinks.reserve(len(src)) - for s in src: - # Files opened in text mode expect writes to be str rather than - # bytes, which requires conversion from utf-8. If the underlying - # buffer is utf-8, we can bypass this conversion by writing - # directly to it. - if codecs.lookup(s.encoding).name not in {"utf-8", "ascii"}: - raise NotImplementedError(f"Unsupported encoding {s.encoding}") - sink.push_back( - unique_ptr[data_sink](new iobase_data_sink(s.buffer)) - ) - data_sinks.push_back(sink.back().get()) - return sink_info(data_sinks) - elif isinstance(src[0], io.IOBase): - data_sinks.reserve(len(src)) - for s in src: - sink.push_back(unique_ptr[data_sink](new iobase_data_sink(s))) - data_sinks.push_back(sink.back().get()) - return sink_info(data_sinks) - elif isinstance(src[0], (basestring, os.PathLike)): - paths.reserve(len(src)) - for s in src: - paths.push_back( os.path.expanduser(s).encode()) - return sink_info(move(paths)) - else: - raise TypeError("Unrecognized input type: {}".format(type(src))) - - -cdef sink_info make_sink_info(src, unique_ptr[data_sink] & sink) except*: - cdef vector[unique_ptr[data_sink]] datasinks - cdef sink_info info = make_sinks_info([src], datasinks) - if not datasinks.empty(): - sink.swap(datasinks[0]) - return info - - -# Adapts a python io.IOBase object as a libcudf IO data_sink. This lets you -# write from cudf to any python file-like object (File/BytesIO/SocketIO etc) -cdef cppclass iobase_data_sink(data_sink): - object buf - - iobase_data_sink(object buf_): - this.buf = buf_ - - void host_write(const void * data, size_t size) with gil: - if isinstance(buf, io.StringIO): - buf.write(PyMemoryView_FromMemory(data, size, PyBUF_READ) - .tobytes().decode()) - else: - buf.write(PyMemoryView_FromMemory(data, size, PyBUF_READ)) - - void flush() with gil: - buf.flush() - - size_t bytes_written() with gil: - return buf.tell() - - cdef add_df_col_struct_names(df, child_names_dict): for name, child_names in child_names_dict.items(): col = df._data[name] From ba3ed5773171a545d43d9e0f598c6c2eb37ec122 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 9 Dec 2024 10:08:13 -0800 Subject: [PATCH 7/7] Fix nvcc-imposed UB in `constexpr` functions (#17534) nvcc does not support `constexpr` functions that are not well-defined to call from the device. This is UB even when the function is not called from the device. Throwing an exception is one such operation. This PR cleans up error handling for functions that are called from device, and removes `constexpr` from the ones that are not actually used from the device, or in the constexpr context. Authors: - Vukasin Milovanovic (https://github.com/vuule) - MithunR (https://github.com/mythrocks) Approvers: - Karthikeyan (https://github.com/karthikeyann) - MithunR (https://github.com/mythrocks) - David Wendt (https://github.com/davidwendt) - Vyas Ramasubramani (https://github.com/vyasr) - Bradley Dice (https://github.com/bdice) - Mike Wilson (https://github.com/hyperbolic2346) - Yunsong Wang (https://github.com/PointKernel) - Muhammad Haseeb (https://github.com/mhaseeb123) URL: https://github.com/rapidsai/cudf/pull/17534 --- .../cudf/detail/utilities/device_operators.cuh | 18 +++++++++++++++++- cpp/include/cudf/utilities/span.hpp | 2 ++ cpp/src/io/orc/writer_impl.cu | 2 +- cpp/src/io/utilities/time_utils.cuh | 6 +++--- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/cpp/include/cudf/detail/utilities/device_operators.cuh b/cpp/include/cudf/detail/utilities/device_operators.cuh index 46f424e051b..d16be5e22dd 100644 --- a/cpp/include/cudf/detail/utilities/device_operators.cuh +++ b/cpp/include/cudf/detail/utilities/device_operators.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,7 +83,11 @@ struct DeviceSum { template ()>* = nullptr> static constexpr T identity() { +#ifndef __CUDA_ARCH__ CUDF_FAIL("fixed_point does not yet support device operator identity"); +#else + CUDF_UNREACHABLE("fixed_point does not yet support device operator identity"); +#endif return T{}; } }; @@ -141,7 +145,11 @@ struct DeviceMin { template ()>* = nullptr> static constexpr T identity() { +#ifndef __CUDA_ARCH__ CUDF_FAIL("fixed_point does not yet support DeviceMin identity"); +#else + CUDF_UNREACHABLE("fixed_point does not yet support DeviceMin identity"); +#endif return cuda::std::numeric_limits::max(); } @@ -189,7 +197,11 @@ struct DeviceMax { template ()>* = nullptr> static constexpr T identity() { +#ifndef __CUDA_ARCH__ CUDF_FAIL("fixed_point does not yet support DeviceMax identity"); +#else + CUDF_UNREACHABLE("fixed_point does not yet support DeviceMax identity"); +#endif return cuda::std::numeric_limits::lowest(); } @@ -225,7 +237,11 @@ struct DeviceProduct { template ()>* = nullptr> static constexpr T identity() { +#ifndef __CUDA_ARCH__ CUDF_FAIL("fixed_point does not yet support DeviceProduct identity"); +#else + CUDF_UNREACHABLE("fixed_point does not yet support DeviceProduct identity"); +#endif return T{1, numeric::scale_type{0}}; } }; diff --git a/cpp/include/cudf/utilities/span.hpp b/cpp/include/cudf/utilities/span.hpp index 21ee4fa9e9b..2273a89892b 100644 --- a/cpp/include/cudf/utilities/span.hpp +++ b/cpp/include/cudf/utilities/span.hpp @@ -417,7 +417,9 @@ class base_2dspan { constexpr base_2dspan(RowType flat_view, size_t columns) : _flat{flat_view}, _size{columns == 0 ? 0 : flat_view.size() / columns, columns} { +#ifndef __CUDA_ARCH__ CUDF_EXPECTS(_size.first * _size.second == flat_view.size(), "Invalid 2D span size"); +#endif } /** diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index d432deb8e79..76e5369ffd0 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -506,7 +506,7 @@ size_t max_varint_size() return cudf::util::div_rounding_up_unsafe(sizeof(T) * 8, 7); } -constexpr size_t RLE_stream_size(TypeKind kind, size_t count) +size_t RLE_stream_size(TypeKind kind, size_t count) { using cudf::util::div_rounding_up_unsafe; constexpr auto byte_rle_max_len = 128; diff --git a/cpp/src/io/utilities/time_utils.cuh b/cpp/src/io/utilities/time_utils.cuh index 687766c1bcc..ff1b9f58e6c 100644 --- a/cpp/src/io/utilities/time_utils.cuh +++ b/cpp/src/io/utilities/time_utils.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +32,7 @@ static const __device__ __constant__ int32_t powers_of_ten[10] = { struct get_period { template - constexpr int32_t operator()() + int32_t operator()() { if constexpr (is_chrono()) { return T::period::den; } CUDF_FAIL("Invalid, non chrono type"); @@ -42,7 +42,7 @@ struct get_period { /** * @brief Function that translates cuDF time unit to clock frequency */ -constexpr int32_t to_clockrate(type_id timestamp_type_id) +inline int32_t to_clockrate(type_id timestamp_type_id) { return timestamp_type_id == type_id::EMPTY ? 0