Skip to content

Commit

Permalink
Merge pull request #264 from Blosc/dataclass
Browse files Browse the repository at this point in the history
Add CParams, DParams and Storage dataclasses
  • Loading branch information
FrancescAlted authored Sep 25, 2024
2 parents 258ac71 + f2f3ee5 commit c41d1aa
Show file tree
Hide file tree
Showing 42 changed files with 839 additions and 470 deletions.
1 change: 1 addition & 0 deletions doc/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ API Reference
.. toctree::
:maxdepth: 2

storage
top_level
classes
array_operations
34 changes: 34 additions & 0 deletions doc/reference/storage.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Dataclasses
===========

Dataclasses for setting the compression, decompression
and storage parameters. All their parameters are optional.

.. currentmodule:: blosc2

CParams
-------

.. autosummary::
:toctree: autofiles/storage
:nosignatures:

CParams

DParams
-------

.. autosummary::
:toctree: autofiles/storage
:nosignatures:

DParams

Storage
-------

.. autosummary::
:toctree: autofiles/storage
:nosignatures:

Storage
73 changes: 20 additions & 53 deletions src/blosc2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,26 @@ class Tuner(Enum):
unpack_tensor,
)

# Internal Blosc threading
# Get CPU info
cpu_info = get_cpu_info()
nthreads = ncores = cpu_info.get("count", 1)
"""Number of threads to be used in compression/decompression.
"""
# Protection against too many threads
nthreads = min(nthreads, 32)
# Experiments say that, when using a large number of threads, it is better to not use them all
nthreads -= nthreads // 8

# This import must be before ndarray and schunk
from .storage import (
CParams,
cparams_dflts,
DParams,
dparams_dflts,
Storage,
storage_dflts,
)

from .ndarray import ( # noqa: I001
NDArray,
Expand Down Expand Up @@ -243,60 +260,7 @@ class Tuner(Enum):
"""
The blosc2 version + date.
"""
# Internal Blosc threading
nthreads = ncores = cpu_info.get("count", 1)
"""Number of threads to be used in compression/decompression.
"""
# Protection against too many threads
nthreads = min(nthreads, 32)
# Experiments say that, when using a large number of threads, it is better to not use them all
nthreads -= nthreads // 8
set_nthreads(nthreads)

# Set the number of threads for NumExpr
numexpr.set_num_threads(nthreads)

# Defaults for compression params
cparams_dflts = {
"codec": Codec.ZSTD,
"codec_meta": 0,
"clevel": 1,
"use_dict": False,
"typesize": 8,
"nthreads": nthreads,
"blocksize": 0,
"splitmode": SplitMode.ALWAYS_SPLIT,
"schunk": None,
"filters": [
Filter.NOFILTER,
Filter.NOFILTER,
Filter.NOFILTER,
Filter.NOFILTER,
Filter.NOFILTER,
Filter.SHUFFLE,
],
"filters_meta": [0, 0, 0, 0, 0, 0],
"prefilter": None,
"preparams": None,
"tuner": Tuner.STUNE,
"instr_codec": False,
}
"""
Compression params defaults.
"""

# Defaults for decompression params
dparams_dflts = {"nthreads": nthreads, "schunk": None, "postfilter": None, "postparams": None}
"""
Decompression params defaults.
"""
# Default for storage
storage_dflts = {"contiguous": False, "urlpath": None, "cparams": None, "dparams": None, "io": None}
"""
Storage params defaults. This is meant only for :ref:`SChunk <SChunk>` or :ref:`NDArray <NDArray>`.
"""

_disable_overloaded_equal = False

# Delayed imports for avoiding overwriting of python builtins
from .ndarray import (
Expand Down Expand Up @@ -341,7 +305,9 @@ class Tuner(Enum):
"__version__",
"compress",
"decompress",
"CParams",
"cparams_dflts",
"DParams",
"dparams_dflts",
"storage_dflts",
"set_compressor",
Expand Down Expand Up @@ -373,6 +339,7 @@ class Tuner(Enum):
"compress2",
"decompress2",
"SChunk",
"Storage",
"open",
"remove_urlpath",
"nthreads",
Expand Down
109 changes: 54 additions & 55 deletions src/blosc2/blosc2_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -716,14 +716,16 @@ cdef _check_dparams(blosc2_dparams* dparams, blosc2_cparams* cparams=NULL):

cdef create_cparams_from_kwargs(blosc2_cparams *cparams, kwargs):
if "compcode" in kwargs:
raise NameError("`compcode` has been renamed to `codec`. Please go update your code.")
raise NameError("`compcode` has been renamed to `codec`. Please go update your code.")
if "shuffle" in kwargs:
raise NameError("`shuffle` has been substituted by `filters`. Please go update your code.")
codec = kwargs.get('codec', blosc2.cparams_dflts['codec'])
cparams.compcode = codec if not isinstance(codec, blosc2.Codec) else codec.value
cparams.compcode_meta = kwargs.get('codec_meta', blosc2.cparams_dflts['codec_meta'])
cparams.clevel = kwargs.get('clevel', blosc2.cparams_dflts['clevel'])
cparams.use_dict = kwargs.get('use_dict', blosc2.cparams_dflts['use_dict'])
cparams.typesize = typesize = kwargs.get('typesize', blosc2.cparams_dflts['typesize'])
cparams.nthreads = kwargs.get('nthreads', blosc2.cparams_dflts['nthreads'])
cparams.nthreads = kwargs.get('nthreads', blosc2.nthreads)
cparams.blocksize = kwargs.get('blocksize', blosc2.cparams_dflts['blocksize'])
splitmode = kwargs.get('splitmode', blosc2.cparams_dflts['splitmode'])
cparams.splitmode = splitmode.value
Expand Down Expand Up @@ -804,7 +806,7 @@ def compress2(src, **kwargs):
return dest[:size]

cdef create_dparams_from_kwargs(blosc2_dparams *dparams, kwargs, blosc2_cparams* cparams=NULL):
dparams.nthreads = kwargs.get('nthreads', blosc2.dparams_dflts['nthreads'])
dparams.nthreads = kwargs.get('nthreads', blosc2.nthreads)
dparams.schunk = NULL
dparams.postfilter = NULL
dparams.postparams = NULL
Expand Down Expand Up @@ -927,7 +929,7 @@ cdef class SChunk:
self._urlpath = urlpath.encode() if isinstance(urlpath, str) else urlpath
kwargs["urlpath"] = self._urlpath

self.mode = kwargs.get("mode", "a")
self.mode = blosc2.Storage().mode if kwargs.get("mode", None) is None else kwargs.get("mode")
self.mmap_mode = kwargs.get("mmap_mode")
self.initial_mapping_size = kwargs.get("initial_mapping_size")
if self.mmap_mode is not None:
Expand Down Expand Up @@ -1067,16 +1069,6 @@ cdef class SChunk:
else:
# User codec
codec = self.schunk.storage.cparams.compcode
cparams_dict = {
"codec": codec,
"codec_meta": self.schunk.storage.cparams.compcode_meta,
"clevel": self.schunk.storage.cparams.clevel,
"use_dict": self.schunk.storage.cparams.use_dict,
"typesize": self.schunk.storage.cparams.typesize,
"nthreads": self.schunk.storage.cparams.nthreads,
"blocksize": self.schunk.storage.cparams.blocksize,
"splitmode": blosc2.SplitMode(self.schunk.storage.cparams.splitmode)
}

filters = [0] * BLOSC2_MAX_FILTERS
filters_meta = [0] * BLOSC2_MAX_FILTERS
Expand All @@ -1087,42 +1079,50 @@ cdef class SChunk:
# User filter
filters[i] = self.schunk.filters[i]
filters_meta[i] = self.schunk.filters_meta[i]
cparams_dict["filters"] = filters
cparams_dict["filters_meta"] = filters_meta
return cparams_dict

def update_cparams(self, cparams_dict):
cparams = blosc2.CParams(
codec=codec,
codec_meta=self.schunk.storage.cparams.compcode_meta,
clevel=self.schunk.storage.cparams.clevel,
use_dict=bool(self.schunk.storage.cparams.use_dict),
typesize=self.schunk.storage.cparams.typesize,
nthreads=self.schunk.storage.cparams.nthreads,
blocksize=self.schunk.storage.cparams.blocksize,
splitmode=blosc2.SplitMode(self.schunk.storage.cparams.splitmode),
tuner=blosc2.Tuner(self.schunk.storage.cparams.tuner_id),
filters=filters,
filters_meta=filters_meta,
)

return cparams

def update_cparams(self, new_cparams):
cdef blosc2_cparams* cparams = self.schunk.storage.cparams
codec = cparams_dict.get('codec', None)
if codec is None:
cparams.compcode = cparams.compcode
else:
cparams.compcode = codec if not isinstance(codec, blosc2.Codec) else codec.value
cparams.compcode_meta = cparams_dict.get('codec_meta', cparams.compcode_meta)
cparams.clevel = cparams_dict.get('clevel', cparams.clevel)
cparams.use_dict = cparams_dict.get('use_dict', cparams.use_dict)
cparams.typesize = cparams_dict.get('typesize', cparams.typesize)
cparams.nthreads = cparams_dict.get('nthreads', cparams.nthreads)
cparams.blocksize = cparams_dict.get('blocksize', cparams.blocksize)
splitmode = cparams_dict.get('splitmode', None)
cparams.splitmode = cparams.splitmode if splitmode is None else splitmode.value

filters = cparams_dict.get('filters', None)
if filters is not None:
for i, filter in enumerate(filters):
cparams.filters[i] = filter.value if isinstance(filter, Enum) else filter
for i in range(len(filters), BLOSC2_MAX_FILTERS):
cparams.filters[i] = 0

filters_meta = cparams_dict.get('filters_meta', None)
codec = new_cparams.codec
cparams.compcode = codec if not isinstance(codec, blosc2.Codec) else codec.value
cparams.compcode_meta = new_cparams.codec_meta
cparams.clevel = new_cparams.clevel
cparams.use_dict = new_cparams.use_dict
cparams.typesize = new_cparams.typesize
cparams.nthreads = new_cparams.nthreads
cparams.blocksize = new_cparams.blocksize
cparams.splitmode = new_cparams.splitmode.value
cparams.tuner_id = new_cparams.tuner.value

filters = new_cparams.filters
for i, filter in enumerate(filters):
cparams.filters[i] = filter.value if isinstance(filter, Enum) else filter
for i in range(len(filters), BLOSC2_MAX_FILTERS):
cparams.filters[i] = 0

filters_meta = new_cparams.filters_meta
cdef int8_t meta_value
if filters_meta is not None:
for i, meta in enumerate(filters_meta):
# We still may want to encode negative values
meta_value = <int8_t> meta if meta < 0 else meta
cparams.filters_meta[i] = <uint8_t> meta_value
for i in range(len(filters_meta), BLOSC2_MAX_FILTERS):
cparams.filters_meta[i] = 0
for i, meta in enumerate(filters_meta):
# We still may want to encode negative values
meta_value = <int8_t> meta if meta < 0 else meta
cparams.filters_meta[i] = <uint8_t> meta_value
for i in range(len(filters_meta), BLOSC2_MAX_FILTERS):
cparams.filters_meta[i] = 0

_check_cparams(cparams)

Expand All @@ -1140,12 +1140,11 @@ cdef class SChunk:
self.schunk.filters_meta = self.schunk.storage.cparams.filters_meta

def get_dparams(self):
dparams_dict = {"nthreads": self.schunk.storage.dparams.nthreads}
return dparams_dict
return blosc2.DParams(nthreads=self.schunk.storage.dparams.nthreads)

def update_dparams(self, dparams_dict):
def update_dparams(self, new_dparams):
cdef blosc2_dparams* dparams = self.schunk.storage.dparams
dparams.nthreads = dparams_dict.get('nthreads', dparams.nthreads)
dparams.nthreads = new_dparams.nthreads

_check_dparams(dparams, self.schunk.storage.cparams)

Expand Down Expand Up @@ -1964,17 +1963,17 @@ def open(urlpath, mode, offset, **kwargs):
res = blosc2.NDArray(_schunk=PyCapsule_New(array.sc, <char *> "blosc2_schunk*", NULL),
_array=PyCapsule_New(array, <char *> "b2nd_array_t*", NULL))
if cparams is not None:
res.schunk.cparams = cparams
res.schunk.cparams = cparams if isinstance(cparams, blosc2.CParams) else blosc2.CParams(**cparams)
if dparams is not None:
res.schunk.dparams = dparams
res.schunk.dparams = dparams if isinstance(dparams, blosc2.DParams) else blosc2.DParams(**dparams)
res.schunk.mode = mode
else:
res = blosc2.SChunk(_schunk=PyCapsule_New(schunk, <char *> "blosc2_schunk*", NULL),
mode=mode, **kwargs)
if cparams is not None:
res.cparams = cparams
res.cparams = cparams if isinstance(cparams, blosc2.CParams) else blosc2.CParams(**cparams)
if dparams is not None:
res.dparams = dparams
res.dparams = dparams if isinstance(dparams, blosc2.DParams) else blosc2.DParams(**dparams)

return res

Expand Down
Loading

0 comments on commit c41d1aa

Please sign in to comment.