From 2f5bf7659e40cd27bb35f10785e233aad5481bbd Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 10 Dec 2024 09:37:46 -0800 Subject: [PATCH] Simplify serialization protocols (#17552) This rewrites all serialization protocols in cudf to remove the need for pickling intermediates. --- python/cudf/cudf/_lib/copying.pyx | 11 +-- python/cudf/cudf/core/_base_index.py | 8 -- python/cudf/cudf/core/abc.py | 16 ++-- python/cudf/cudf/core/buffer/buffer.py | 8 +- .../cudf/cudf/core/buffer/spillable_buffer.py | 4 +- python/cudf/cudf/core/column/column.py | 23 +++--- python/cudf/cudf/core/dataframe.py | 9 +- python/cudf/cudf/core/dtypes.py | 77 +++++++----------- python/cudf/cudf/core/frame.py | 73 +++++++++++++---- python/cudf/cudf/core/groupby/groupby.py | 13 ++- python/cudf/cudf/core/index.py | 13 +-- python/cudf/cudf/core/multiindex.py | 7 +- python/cudf/cudf/core/resample.py | 12 +-- python/cudf/cudf/core/series.py | 9 +- .../stringColumnWithRangeIndex_cudf_23.12.pkl | Bin 1394 -> 1108 bytes python/cudf/cudf/tests/test_serialize.py | 19 ++++- python/cudf/cudf/tests/test_struct.py | 2 +- .../dask_cudf/tests/test_distributed.py | 16 +++- 18 files changed, 179 insertions(+), 141 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 4dfb12d8ab3..c478cd1a990 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -1,7 +1,5 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -import pickle - from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.utility cimport move @@ -367,14 +365,13 @@ class PackedColumns(Serializable): header["index-names"] = self.index_names header["metadata"] = self._metadata.tobytes() for name, dtype in self.column_dtypes.items(): - dtype_header, dtype_frames = dtype.serialize() + dtype_header, dtype_frames = dtype.device_serialize() self.column_dtypes[name] = ( dtype_header, (len(frames), len(frames) + len(dtype_frames)), ) frames.extend(dtype_frames) header["column-dtypes"] = self.column_dtypes - header["type-serialized"] = pickle.dumps(type(self)) return header, frames @classmethod @@ -382,9 +379,9 @@ class PackedColumns(Serializable): column_dtypes = {} for name, dtype in header["column-dtypes"].items(): dtype_header, (start, stop) = dtype - column_dtypes[name] = pickle.loads( - dtype_header["type-serialized"] - ).deserialize(dtype_header, frames[start:stop]) + column_dtypes[name] = Serializable.device_deserialize( + dtype_header, frames[start:stop] + ) return cls( plc.contiguous_split.pack( plc.contiguous_split.unpack_from_memoryviews( diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index a6abd63d042..950ce5f1236 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -2,7 +2,6 @@ from __future__ import annotations -import pickle import warnings from functools import cached_property from typing import TYPE_CHECKING, Any, Literal @@ -330,13 +329,6 @@ def get_level_values(self, level): else: raise KeyError(f"Requested level with name {level} " "not found") - @classmethod - def deserialize(cls, header, frames): - # Dispatch deserialization to the appropriate index type in case - # deserialization is ever attempted with the base class directly. - idx_type = pickle.loads(header["type-serialized"]) - return idx_type.deserialize(header, frames) - @property def names(self): """ diff --git a/python/cudf/cudf/core/abc.py b/python/cudf/cudf/core/abc.py index ce6bb83bc77..c8ea03b04fe 100644 --- a/python/cudf/cudf/core/abc.py +++ b/python/cudf/cudf/core/abc.py @@ -1,8 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. """Common abstract base classes for cudf.""" -import pickle - import numpy import cudf @@ -22,6 +20,14 @@ class Serializable: latter converts back from that representation into an equivalent object. """ + # A mapping from class names to the classes themselves. This is used to + # reconstruct the correct class when deserializing an object. + _name_type_map: dict = {} + + def __init_subclass__(cls, /, **kwargs): + super().__init_subclass__(**kwargs) + cls._name_type_map[cls.__name__] = cls + def serialize(self): """Generate an equivalent serializable representation of an object. @@ -98,7 +104,7 @@ def device_serialize(self): ) for f in frames ) - header["type-serialized"] = pickle.dumps(type(self)) + header["type-serialized-name"] = type(self).__name__ header["is-cuda"] = [ hasattr(f, "__cuda_array_interface__") for f in frames ] @@ -128,10 +134,10 @@ def device_deserialize(cls, header, frames): :meta private: """ - typ = pickle.loads(header["type-serialized"]) + typ = cls._name_type_map[header["type-serialized-name"]] frames = [ cudf.core.buffer.as_buffer(f) if c else memoryview(f) - for c, f in zip(header["is-cuda"], frames) + for c, f in zip(header["is-cuda"], frames, strict=True) ] return typ.deserialize(header, frames) diff --git a/python/cudf/cudf/core/buffer/buffer.py b/python/cudf/cudf/core/buffer/buffer.py index ffa306bf93f..625938ca168 100644 --- a/python/cudf/cudf/core/buffer/buffer.py +++ b/python/cudf/cudf/core/buffer/buffer.py @@ -3,7 +3,6 @@ from __future__ import annotations import math -import pickle import weakref from types import SimpleNamespace from typing import TYPE_CHECKING, Any, Literal @@ -432,8 +431,7 @@ def serialize(self) -> tuple[dict, list]: second element is a list containing single frame. """ header: dict[str, Any] = {} - header["type-serialized"] = pickle.dumps(type(self)) - header["owner-type-serialized"] = pickle.dumps(type(self._owner)) + header["owner-type-serialized-name"] = type(self._owner).__name__ header["frame_count"] = 1 frames = [self] return header, frames @@ -460,7 +458,9 @@ def deserialize(cls, header: dict, frames: list) -> Self: if isinstance(frame, cls): return frame # The frame is already deserialized - owner_type: BufferOwner = pickle.loads(header["owner-type-serialized"]) + owner_type: BufferOwner = Serializable._name_type_map[ + header["owner-type-serialized-name"] + ] if hasattr(frame, "__cuda_array_interface__"): owner = owner_type.from_device_memory(frame, exposed=False) else: diff --git a/python/cudf/cudf/core/buffer/spillable_buffer.py b/python/cudf/cudf/core/buffer/spillable_buffer.py index b40c56c9a6b..66f8be4ddc5 100644 --- a/python/cudf/cudf/core/buffer/spillable_buffer.py +++ b/python/cudf/cudf/core/buffer/spillable_buffer.py @@ -3,7 +3,6 @@ from __future__ import annotations import collections.abc -import pickle import time import weakref from threading import RLock @@ -415,8 +414,7 @@ def serialize(self) -> tuple[dict, list]: header: dict[str, Any] = {} frames: list[Buffer | memoryview] with self._owner.lock: - header["type-serialized"] = pickle.dumps(self.__class__) - header["owner-type-serialized"] = pickle.dumps(type(self._owner)) + header["owner-type-serialized-name"] = type(self._owner).__name__ header["frame_count"] = 1 if self.is_spilled: frames = [self.memoryview()] diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index f6eaea4b783..4b1e9c1129e 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -2,7 +2,6 @@ from __future__ import annotations -import pickle from collections import abc from collections.abc import MutableSequence, Sequence from functools import cached_property @@ -1224,28 +1223,27 @@ def serialize(self) -> tuple[dict, list]: header: dict[Any, Any] = {} frames = [] - header["type-serialized"] = pickle.dumps(type(self)) try: - dtype, dtype_frames = self.dtype.serialize() + dtype, dtype_frames = self.dtype.device_serialize() header["dtype"] = dtype frames.extend(dtype_frames) header["dtype-is-cudf-serialized"] = True except AttributeError: - header["dtype"] = pickle.dumps(self.dtype) + header["dtype"] = self.dtype.str header["dtype-is-cudf-serialized"] = False if self.data is not None: - data_header, data_frames = self.data.serialize() + data_header, data_frames = self.data.device_serialize() header["data"] = data_header frames.extend(data_frames) if self.mask is not None: - mask_header, mask_frames = self.mask.serialize() + mask_header, mask_frames = self.mask.device_serialize() header["mask"] = mask_header frames.extend(mask_frames) if self.children: child_headers, child_frames = zip( - *(c.serialize() for c in self.children) + *(c.device_serialize() for c in self.children) ) header["subheaders"] = list(child_headers) frames.extend(chain(*child_frames)) @@ -1257,8 +1255,7 @@ def serialize(self) -> tuple[dict, list]: def deserialize(cls, header: dict, frames: list) -> ColumnBase: def unpack(header, frames) -> tuple[Any, list]: count = header["frame_count"] - klass = pickle.loads(header["type-serialized"]) - obj = klass.deserialize(header, frames[:count]) + obj = cls.device_deserialize(header, frames[:count]) return obj, frames[count:] assert header["frame_count"] == len(frames), ( @@ -1268,7 +1265,7 @@ def unpack(header, frames) -> tuple[Any, list]: if header["dtype-is-cudf-serialized"]: dtype, frames = unpack(header["dtype"], frames) else: - dtype = pickle.loads(header["dtype"]) + dtype = np.dtype(header["dtype"]) if "data" in header: data, frames = unpack(header["data"], frames) else: @@ -2219,7 +2216,9 @@ def serialize_columns(columns: list[ColumnBase]) -> tuple[list[dict], list]: frames = [] if len(columns) > 0: - header_columns = [c.serialize() for c in columns] + header_columns: list[tuple[dict, list]] = [ + c.device_serialize() for c in columns + ] headers, column_frames = zip(*header_columns) for f in column_frames: frames.extend(f) @@ -2236,7 +2235,7 @@ def deserialize_columns(headers: list[dict], frames: list) -> list[ColumnBase]: for meta in headers: col_frame_count = meta["frame_count"] - col_typ = pickle.loads(meta["type-serialized"]) + col_typ = Serializable._name_type_map[meta["type-serialized-name"]] colobj = col_typ.deserialize(meta, frames[:col_frame_count]) columns.append(colobj) # Advance frames diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index bd78d5dd9f1..fd68a40324e 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -7,7 +7,6 @@ import itertools import numbers import os -import pickle import re import sys import textwrap @@ -44,7 +43,6 @@ ) from cudf.core import column, df_protocol, indexing_utils, reshape from cudf.core._compat import PANDAS_LT_300 -from cudf.core.abc import Serializable from cudf.core.buffer import acquire_spill_lock from cudf.core.column import ( CategoricalColumn, @@ -582,7 +580,7 @@ class _DataFrameiAtIndexer(_DataFrameIlocIndexer): pass -class DataFrame(IndexedFrame, Serializable, GetAttrGetItemMixin): +class DataFrame(IndexedFrame, GetAttrGetItemMixin): """ A GPU Dataframe object. @@ -1184,7 +1182,7 @@ def _constructor_expanddim(self): def serialize(self): header, frames = super().serialize() - header["index"], index_frames = self.index.serialize() + header["index"], index_frames = self.index.device_serialize() header["index_frame_count"] = len(index_frames) # For backwards compatibility with older versions of cuDF, index # columns are placed before data columns. @@ -1199,8 +1197,7 @@ def deserialize(cls, header, frames): header, frames[header["index_frame_count"] :] ) - idx_typ = pickle.loads(header["index"]["type-serialized"]) - index = idx_typ.deserialize(header["index"], frames[:index_nframes]) + index = cls.device_deserialize(header["index"], frames[:index_nframes]) obj.index = index return obj diff --git a/python/cudf/cudf/core/dtypes.py b/python/cudf/cudf/core/dtypes.py index 2110e610c37..8765a27a165 100644 --- a/python/cudf/cudf/core/dtypes.py +++ b/python/cudf/cudf/core/dtypes.py @@ -3,7 +3,6 @@ import decimal import operator -import pickle import textwrap import warnings from functools import cached_property @@ -91,13 +90,13 @@ def dtype(arbitrary): raise TypeError(f"Cannot interpret {arbitrary} as a valid cuDF dtype") -def _decode_type( +def _check_type( cls: type, header: dict, frames: list, is_valid_class: Callable[[type, type], bool] = operator.is_, -) -> tuple[dict, list, type]: - """Decode metadata-encoded type and check validity +) -> None: + """Perform metadata-encoded type and check validity Parameters ---------- @@ -112,12 +111,6 @@ class performing deserialization serialization by `cls` (default is to check type equality), called as `is_valid_class(decoded_class, cls)`. - Returns - ------- - tuple - Tuple of validated headers, frames, and the decoded class - constructor. - Raises ------ AssertionError @@ -128,11 +121,10 @@ class performing deserialization f"Deserialization expected {header['frame_count']} frames, " f"but received {len(frames)}." ) - klass = pickle.loads(header["type-serialized"]) assert is_valid_class( - klass, cls + klass := Serializable._name_type_map[header["type-serialized-name"]], + cls, ), f"Header-encoded {klass=} does not match decoding {cls=}." - return header, frames, klass class _BaseDtype(ExtensionDtype, Serializable): @@ -305,13 +297,14 @@ def construct_from_string(self): def serialize(self): header = {} - header["type-serialized"] = pickle.dumps(type(self)) header["ordered"] = self.ordered frames = [] if self.categories is not None: - categories_header, categories_frames = self.categories.serialize() + categories_header, categories_frames = ( + self.categories.device_serialize() + ) header["categories"] = categories_header frames.extend(categories_frames) header["frame_count"] = len(frames) @@ -319,15 +312,14 @@ def serialize(self): @classmethod def deserialize(cls, header, frames): - header, frames, klass = _decode_type(cls, header, frames) + _check_type(cls, header, frames) ordered = header["ordered"] categories_header = header["categories"] categories_frames = frames - categories_type = pickle.loads(categories_header["type-serialized"]) - categories = categories_type.deserialize( + categories = Serializable.device_deserialize( categories_header, categories_frames ) - return klass(categories=categories, ordered=ordered) + return cls(categories=categories, ordered=ordered) def __repr__(self): return self.to_pandas().__repr__() @@ -495,12 +487,13 @@ def __hash__(self): def serialize(self) -> tuple[dict, list]: header: dict[str, Dtype] = {} - header["type-serialized"] = pickle.dumps(type(self)) frames = [] if isinstance(self.element_type, _BaseDtype): - header["element-type"], frames = self.element_type.serialize() + header["element-type"], frames = ( + self.element_type.device_serialize() + ) else: header["element-type"] = getattr( self.element_type, "name", self.element_type @@ -510,14 +503,14 @@ def serialize(self) -> tuple[dict, list]: @classmethod def deserialize(cls, header: dict, frames: list): - header, frames, klass = _decode_type(cls, header, frames) + _check_type(cls, header, frames) if isinstance(header["element-type"], dict): - element_type = pickle.loads( - header["element-type"]["type-serialized"] - ).deserialize(header["element-type"], frames) + element_type = Serializable.device_deserialize( + header["element-type"], frames + ) else: element_type = header["element-type"] - return klass(element_type=element_type) + return cls(element_type=element_type) @cached_property def itemsize(self): @@ -641,7 +634,6 @@ def __hash__(self): def serialize(self) -> tuple[dict, list]: header: dict[str, Any] = {} - header["type-serialized"] = pickle.dumps(type(self)) frames: list[Buffer] = [] @@ -649,33 +641,31 @@ def serialize(self) -> tuple[dict, list]: for k, dtype in self.fields.items(): if isinstance(dtype, _BaseDtype): - dtype_header, dtype_frames = dtype.serialize() + dtype_header, dtype_frames = dtype.device_serialize() fields[k] = ( dtype_header, (len(frames), len(frames) + len(dtype_frames)), ) frames.extend(dtype_frames) else: - fields[k] = pickle.dumps(dtype) + fields[k] = dtype.str header["fields"] = fields header["frame_count"] = len(frames) return header, frames @classmethod def deserialize(cls, header: dict, frames: list): - header, frames, klass = _decode_type(cls, header, frames) + _check_type(cls, header, frames) fields = {} for k, dtype in header["fields"].items(): if isinstance(dtype, tuple): dtype_header, (start, stop) = dtype - fields[k] = pickle.loads( - dtype_header["type-serialized"] - ).deserialize( + fields[k] = Serializable.device_deserialize( dtype_header, frames[start:stop], ) else: - fields[k] = pickle.loads(dtype) + fields[k] = np.dtype(dtype) return cls(fields) @cached_property @@ -838,7 +828,6 @@ def _from_decimal(cls, decimal): def serialize(self) -> tuple[dict, list]: return ( { - "type-serialized": pickle.dumps(type(self)), "precision": self.precision, "scale": self.scale, "frame_count": 0, @@ -848,11 +837,8 @@ def serialize(self) -> tuple[dict, list]: @classmethod def deserialize(cls, header: dict, frames: list): - header, frames, klass = _decode_type( - cls, header, frames, is_valid_class=issubclass - ) - klass = pickle.loads(header["type-serialized"]) - return klass(header["precision"], header["scale"]) + _check_type(cls, header, frames, is_valid_class=issubclass) + return cls(header["precision"], header["scale"]) def __eq__(self, other: Dtype) -> bool: if other is self: @@ -960,18 +946,17 @@ def __hash__(self): def serialize(self) -> tuple[dict, list]: header = { - "type-serialized": pickle.dumps(type(self)), - "fields": pickle.dumps((self.subtype, self.closed)), + "fields": (self.subtype.str, self.closed), "frame_count": 0, } return header, [] @classmethod def deserialize(cls, header: dict, frames: list): - header, frames, klass = _decode_type(cls, header, frames) - klass = pickle.loads(header["type-serialized"]) - subtype, closed = pickle.loads(header["fields"]) - return klass(subtype, closed=closed) + _check_type(cls, header, frames) + subtype, closed = header["fields"] + subtype = np.dtype(subtype) + return cls(subtype, closed=closed) def _is_categorical_dtype(obj): diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 30868924bcd..f7af374ca8d 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -3,7 +3,6 @@ from __future__ import annotations import operator -import pickle import warnings from collections import abc from typing import TYPE_CHECKING, Any, Literal @@ -22,6 +21,7 @@ from cudf import _lib as libcudf from cudf.api.types import is_dtype_equal, is_scalar from cudf.core._compat import PANDAS_LT_300 +from cudf.core.abc import Serializable from cudf.core.buffer import acquire_spill_lock from cudf.core.column import ( ColumnBase, @@ -45,7 +45,7 @@ # TODO: It looks like Frame is missing a declaration of `copy`, need to add -class Frame(BinaryOperand, Scannable): +class Frame(BinaryOperand, Scannable, Serializable): """A collection of Column objects with an optional index. Parameters @@ -95,37 +95,80 @@ def ndim(self) -> int: @_performance_tracking def serialize(self): # TODO: See if self._data can be serialized outright + frames = [] header = { - "type-serialized": pickle.dumps(type(self)), - "column_names": pickle.dumps(self._column_names), - "column_rangeindex": pickle.dumps(self._data.rangeindex), - "column_multiindex": pickle.dumps(self._data.multiindex), - "column_label_dtype": pickle.dumps(self._data.label_dtype), - "column_level_names": pickle.dumps(self._data._level_names), + "column_label_dtype": None, + "dtype-is-cudf-serialized": False, } - header["columns"], frames = serialize_columns(self._columns) + if (label_dtype := self._data.label_dtype) is not None: + try: + header["column_label_dtype"], frames = ( + label_dtype.device_serialize() + ) + header["dtype-is-cudf-serialized"] = True + except AttributeError: + header["column_label_dtype"] = label_dtype.str + + header["columns"], column_frames = serialize_columns(self._columns) + column_names, column_names_numpy_type = ( + zip( + *[ + (cname.item(), type(cname).__name__) + if isinstance(cname, np.generic) + else (cname, "") + for cname in self._column_names + ] + ) + if self._column_names + else ((), ()) + ) + header |= { + "column_names": column_names, + "column_names_numpy_type": column_names_numpy_type, + "column_rangeindex": self._data.rangeindex, + "column_multiindex": self._data.multiindex, + "column_level_names": self._data._level_names, + } + frames.extend(column_frames) + return header, frames @classmethod @_performance_tracking def deserialize(cls, header, frames): - cls_deserialize = pickle.loads(header["type-serialized"]) - column_names = pickle.loads(header["column_names"]) - columns = deserialize_columns(header["columns"], frames) kwargs = {} + dtype_header = header["column_label_dtype"] + if header["dtype-is-cudf-serialized"]: + count = dtype_header["frame_count"] + kwargs["label_dtype"] = cls.device_deserialize( + header, frames[:count] + ) + frames = frames[count:] + else: + kwargs["label_dtype"] = ( + np.dtype(dtype_header) if dtype_header is not None else None + ) + + columns = deserialize_columns(header["columns"], frames) for metadata in [ "rangeindex", "multiindex", - "label_dtype", "level_names", ]: key = f"column_{metadata}" if key in header: - kwargs[metadata] = pickle.loads(header[key]) + kwargs[metadata] = header[key] + + column_names = [ + getattr(np, cntype)(cname) if cntype != "" else cname + for cname, cntype in zip( + header["column_names"], header["column_names_numpy_type"] + ) + ] col_accessor = ColumnAccessor( data=dict(zip(column_names, columns)), **kwargs ) - return cls_deserialize._from_data(col_accessor) + return cls._from_data(col_accessor) @classmethod @_performance_tracking diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index e59b948aba9..a7ced1b833a 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -3,7 +3,6 @@ import copy import itertools -import pickle import textwrap import warnings from collections import abc @@ -1265,7 +1264,7 @@ def serialize(self): obj_header, obj_frames = self.obj.serialize() header["obj"] = obj_header - header["obj_type"] = pickle.dumps(type(self.obj)) + header["obj_type_name"] = type(self.obj).__name__ header["num_obj_frames"] = len(obj_frames) frames.extend(obj_frames) @@ -1280,7 +1279,7 @@ def serialize(self): def deserialize(cls, header, frames): kwargs = header["kwargs"] - obj_type = pickle.loads(header["obj_type"]) + obj_type = Serializable._name_type_map[header["obj_type_name"]] obj = obj_type.deserialize( header["obj"], frames[: header["num_obj_frames"]] ) @@ -3304,8 +3303,8 @@ def _handle_misc(self, by): def serialize(self): header = {} frames = [] - header["names"] = pickle.dumps(self.names) - header["_named_columns"] = pickle.dumps(self._named_columns) + header["names"] = self.names + header["_named_columns"] = self._named_columns column_header, column_frames = cudf.core.column.serialize_columns( self._key_columns ) @@ -3315,8 +3314,8 @@ def serialize(self): @classmethod def deserialize(cls, header, frames): - names = pickle.loads(header["names"]) - _named_columns = pickle.loads(header["_named_columns"]) + names = header["names"] + _named_columns = header["_named_columns"] key_columns = cudf.core.column.deserialize_columns( header["columns"], frames ) diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 1b90e9f9df0..244bd877c1a 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -3,7 +3,6 @@ from __future__ import annotations import operator -import pickle import warnings from collections.abc import Hashable, MutableMapping from functools import cache, cached_property @@ -495,9 +494,8 @@ def serialize(self): header["index_column"]["step"] = self.step frames = [] - header["name"] = pickle.dumps(self.name) - header["dtype"] = pickle.dumps(self.dtype) - header["type-serialized"] = pickle.dumps(type(self)) + header["name"] = self.name + header["dtype"] = self.dtype.str header["frame_count"] = 0 return header, frames @@ -505,11 +503,14 @@ def serialize(self): @_performance_tracking def deserialize(cls, header, frames): h = header["index_column"] - name = pickle.loads(header["name"]) + name = header["name"] start = h["start"] stop = h["stop"] step = h.get("step", 1) - return RangeIndex(start=start, stop=stop, step=step, name=name) + dtype = np.dtype(header["dtype"]) + return RangeIndex( + start=start, stop=stop, step=step, dtype=dtype, name=name + ) @property # type: ignore @_performance_tracking diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index bfff62f0a89..a878b072860 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -5,7 +5,6 @@ import itertools import numbers import operator -import pickle import warnings from functools import cached_property from typing import TYPE_CHECKING, Any @@ -918,15 +917,15 @@ def take(self, indices) -> Self: def serialize(self): header, frames = super().serialize() # Overwrite the names in _data with the true names. - header["column_names"] = pickle.dumps(self.names) + header["column_names"] = self.names return header, frames @classmethod @_performance_tracking def deserialize(cls, header, frames): # Spoof the column names to construct the frame, then set manually. - column_names = pickle.loads(header["column_names"]) - header["column_names"] = pickle.dumps(range(0, len(column_names))) + column_names = header["column_names"] + header["column_names"] = range(0, len(column_names)) obj = super().deserialize(header, frames) return obj._set_names(column_names) diff --git a/python/cudf/cudf/core/resample.py b/python/cudf/cudf/core/resample.py index d95d252559f..391ee31f125 100644 --- a/python/cudf/cudf/core/resample.py +++ b/python/cudf/cudf/core/resample.py @@ -15,7 +15,6 @@ # limitations under the License. from __future__ import annotations -import pickle import warnings from typing import TYPE_CHECKING @@ -26,6 +25,7 @@ import cudf from cudf._lib.column import Column +from cudf.core.abc import Serializable from cudf.core.buffer import acquire_spill_lock from cudf.core.groupby.groupby import ( DataFrameGroupBy, @@ -97,21 +97,21 @@ def serialize(self): header, frames = super().serialize() grouping_head, grouping_frames = self.grouping.serialize() header["grouping"] = grouping_head - header["resampler_type"] = pickle.dumps(type(self)) + header["resampler_type"] = type(self).__name__ header["grouping_frames_count"] = len(grouping_frames) frames.extend(grouping_frames) return header, frames @classmethod def deserialize(cls, header, frames): - obj_type = pickle.loads(header["obj_type"]) + obj_type = Serializable._name_type_map[header["obj_type_name"]] obj = obj_type.deserialize( header["obj"], frames[: header["num_obj_frames"]] ) grouping = _ResampleGrouping.deserialize( header["grouping"], frames[header["num_obj_frames"] :] ) - resampler_cls = pickle.loads(header["resampler_type"]) + resampler_cls = Serializable._name_type_map[header["resampler_type"]] out = resampler_cls.__new__(resampler_cls) out.grouping = grouping super().__init__(out, obj, by=grouping) @@ -163,8 +163,8 @@ def serialize(self): @classmethod def deserialize(cls, header, frames): - names = pickle.loads(header["names"]) - _named_columns = pickle.loads(header["_named_columns"]) + names = header["names"] + _named_columns = header["_named_columns"] key_columns = cudf.core.column.deserialize_columns( header["columns"], frames[: -header["__bin_labels_count"]] ) diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 9b60424c924..778db5973bf 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -4,7 +4,6 @@ import functools import inspect -import pickle import textwrap import warnings from collections import abc @@ -28,7 +27,6 @@ ) from cudf.core import indexing_utils from cudf.core._compat import PANDAS_LT_300 -from cudf.core.abc import Serializable from cudf.core.buffer import acquire_spill_lock from cudf.core.column import ( ColumnBase, @@ -415,7 +413,7 @@ def _loc_to_iloc(self, arg): return indices -class Series(SingleColumnFrame, IndexedFrame, Serializable): +class Series(SingleColumnFrame, IndexedFrame): """ One-dimensional GPU array (including time series). @@ -900,7 +898,7 @@ def hasnans(self): def serialize(self): header, frames = super().serialize() - header["index"], index_frames = self.index.serialize() + header["index"], index_frames = self.index.device_serialize() header["index_frame_count"] = len(index_frames) # For backwards compatibility with older versions of cuDF, index # columns are placed before data columns. @@ -916,8 +914,7 @@ def deserialize(cls, header, frames): header, frames[header["index_frame_count"] :] ) - idx_typ = pickle.loads(header["index"]["type-serialized"]) - index = idx_typ.deserialize(header["index"], frames[:index_nframes]) + index = cls.device_deserialize(header["index"], frames[:index_nframes]) obj.index = index return obj diff --git a/python/cudf/cudf/tests/data/pkl/stringColumnWithRangeIndex_cudf_23.12.pkl b/python/cudf/cudf/tests/data/pkl/stringColumnWithRangeIndex_cudf_23.12.pkl index 1ec077d10f77f4b3b3a8cc1bbfee559707b60dfc..64e06f0631d1475e9b6ab62434227253ad95a28b 100644 GIT binary patch literal 1108 zcmbVLO>fgc5RDs$(AI*6hP3J*AeTf!;=qAJL=aLjB%+)+AZuf9vKEdVt#=7QMdA{g zNPFw>dtqibse&jRuw-kz``(-HZ~l*SzhWPAccjxyrFjXaKH-WfCE*&(ajcVZH!dXa zCQPxhWK#}i{{`AFt&Nx?QIsl5c*$kTvh)jw?{EQMp=}<-MW&~Dl(7-dqC_ob90ump z8lAN4ka*{YmcZK79iz1Lnq!!~%OU)e@tDgYLBJf^ zWTLpxxq{F$&D%+L90|+f0%q_5R?O5ho==o0@h=RRHvW{AA1MTJESlnB=!up%5vPO| zXNc(`=AhIg!CAs3(Fl9bRG+0!Kpd?_6cX4u;!%A{ehlhxnq{~ZHZyW6~dW# z6#59Qik1o9DVgSz9b9|0T5*c19R^`9Y;wH>6Kr}#rNSZb7~tW_?qQVc>1+|%E}9Bm zH#XOjHZvwATVUBD$>Sm~mDI7Kjj35Aj!T|6$Tg<0guXX|D_o0q=!L-& z#2d4jZlvt#$FN?x+p6&{$?vP5_}EWaQ7~Hf1Ca`zWyQRZSps+@UW*|qi?>_d9{#*v e_j`4>)BpXwUA^hlnLQ{BD$Hjox5;g9SelgkgE>*?OI-{1 zZD)V4o_n)w-Ry8`Aj!$iz27RrL8m8S8T?_E*h)^vM$^<5Wd#d>!agEn4JoBtLvvENwKjjI@<8*+YK3T)J z{*XNaS0Y0J{5?e2DiA8E8jg<6f1(6SA78<2dW^K2!LlxYgHEUO;U`}95$sCnA=rHj z)`~^TgfOqmOgl#2Yr@||=T8ggbK&y3FumRxbeu-?i%pMfO%&D?rCq5ANjQpJ(vFbX z2m;3#6b&Q0DO>QJvD5M3CR{HS(qgU)`l^LVcvK{zc9DcUZoRrs(gA&MNtn7uUL~dP z=2R|KN=aAq`Xrd(=5#uxx|+~*AeqT{GjM};(4czdkjIjeUP4VajzQr+y>9y=pB6)f z(}ZwNuut4Br(u?2o2gKmsZumhHI4EuC#c>8{BjTS9x4a!1lSI%o8D5J^Xb3ZTPFQ8 z-(@kQNs=9AJc$68*f!fWnCx|d*v5}{GrwI7k2AIY`n4Fnk)t;Z+!Ef#i+gsP6V%K^ F?-%(Z>0$r? diff --git a/python/cudf/cudf/tests/test_serialize.py b/python/cudf/cudf/tests/test_serialize.py index 68f2aaf9cab..b50ed04427f 100644 --- a/python/cudf/cudf/tests/test_serialize.py +++ b/python/cudf/cudf/tests/test_serialize.py @@ -7,6 +7,7 @@ import numpy as np import pandas as pd import pytest +from packaging import version import cudf from cudf.testing import _utils as utils, assert_eq @@ -149,13 +150,19 @@ def test_serialize(df, to_host): def test_serialize_dtype_error_checking(): dtype = cudf.IntervalDtype("float", "right") - header, frames = dtype.serialize() - with pytest.raises(AssertionError): - # Invalid number of frames - type(dtype).deserialize(header, [None] * (header["frame_count"] + 1)) + # Must call device_serialize (not serialize) to ensure that the type metadata is + # encoded in the header. + header, frames = dtype.device_serialize() with pytest.raises(AssertionError): # mismatching class cudf.StructDtype.deserialize(header, frames) + # The is-cuda flag list length must match the number of frames + header["is-cuda"] = [False] + with pytest.raises(AssertionError): + # Invalid number of frames + type(dtype).deserialize( + header, [np.zeros(1)] * (header["frame_count"] + 1) + ) def test_serialize_dataframe(): @@ -382,6 +389,10 @@ def test_serialize_string_check_buffer_sizes(): assert expect == got +@pytest.mark.skipif( + version.parse(np.__version__) < version.parse("2.0.0"), + reason="The serialization of numpy 2.0 types is incompatible with numpy 1.x", +) def test_deserialize_cudf_23_12(datadir): fname = datadir / "pkl" / "stringColumnWithRangeIndex_cudf_23.12.pkl" diff --git a/python/cudf/cudf/tests/test_struct.py b/python/cudf/cudf/tests/test_struct.py index 899d78c999b..b85943626a6 100644 --- a/python/cudf/cudf/tests/test_struct.py +++ b/python/cudf/cudf/tests/test_struct.py @@ -79,7 +79,7 @@ def test_series_construction_with_nulls(): ) def test_serialize_struct_dtype(fields): dtype = cudf.StructDtype(fields) - recreated = dtype.__class__.deserialize(*dtype.serialize()) + recreated = dtype.__class__.device_deserialize(*dtype.device_serialize()) assert recreated == dtype diff --git a/python/dask_cudf/dask_cudf/tests/test_distributed.py b/python/dask_cudf/dask_cudf/tests/test_distributed.py index d03180852eb..c28b7e49207 100644 --- a/python/dask_cudf/dask_cudf/tests/test_distributed.py +++ b/python/dask_cudf/dask_cudf/tests/test_distributed.py @@ -4,7 +4,7 @@ import pytest import dask -from dask import dataframe as dd +from dask import array as da, dataframe as dd from dask.distributed import Client from distributed.utils_test import cleanup, loop, loop_in_thread # noqa: F401 @@ -121,3 +121,17 @@ def test_unique(): ddf.x.unique().compute(), check_index=False, ) + + +def test_serialization_of_numpy_types(): + # Dask uses numpy integers as column names, which can break cudf serialization + with dask_cuda.LocalCUDACluster(n_workers=1) as cluster: + with Client(cluster): + with dask.config.set( + {"dataframe.backend": "cudf", "array.backend": "cupy"} + ): + rng = da.random.default_rng() + X_arr = rng.random((100, 10), chunks=(50, 10)) + X = dd.from_dask_array(X_arr) + X = X[X.columns[0]] + X.compute()