Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working with ProxySource: examples. #259

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ API Reference
.. toctree::
:maxdepth: 2

storage
top_level
classes
array_operations
34 changes: 34 additions & 0 deletions doc/reference/storage.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Dataclasses
===========

Dataclasses for setting the compression, decompression
and storage parameters. All their parameters are optional.

.. currentmodule:: blosc2

CParams
-------

.. autosummary::
:toctree: autofiles/storage
:nosignatures:

CParams

DParams
-------

.. autosummary::
:toctree: autofiles/storage
:nosignatures:

DParams

Storage
-------

.. autosummary::
:toctree: autofiles/storage
:nosignatures:

Storage
73 changes: 20 additions & 53 deletions src/blosc2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,26 @@ class Tuner(Enum):
unpack_tensor,
)

# Internal Blosc threading
# Get CPU info
cpu_info = get_cpu_info()
nthreads = ncores = cpu_info.get("count", 1)
"""Number of threads to be used in compression/decompression.
"""
# Protection against too many threads
nthreads = min(nthreads, 32)
# Experiments say that, when using a large number of threads, it is better to not use them all
nthreads -= nthreads // 8

# This import must be before ndarray and schunk
from .storage import (
CParams,
cparams_dflts,
DParams,
dparams_dflts,
Storage,
storage_dflts,
)

from .ndarray import ( # noqa: I001
NDArray,
Expand Down Expand Up @@ -243,60 +260,7 @@ class Tuner(Enum):
"""
The blosc2 version + date.
"""
# Internal Blosc threading
nthreads = ncores = cpu_info.get("count", 1)
"""Number of threads to be used in compression/decompression.
"""
# Protection against too many threads
nthreads = min(nthreads, 32)
# Experiments say that, when using a large number of threads, it is better to not use them all
nthreads -= nthreads // 8
set_nthreads(nthreads)

# Set the number of threads for NumExpr
numexpr.set_num_threads(nthreads)

# Defaults for compression params
cparams_dflts = {
"codec": Codec.ZSTD,
"codec_meta": 0,
"clevel": 1,
"use_dict": False,
"typesize": 8,
"nthreads": nthreads,
"blocksize": 0,
"splitmode": SplitMode.ALWAYS_SPLIT,
"schunk": None,
"filters": [
Filter.NOFILTER,
Filter.NOFILTER,
Filter.NOFILTER,
Filter.NOFILTER,
Filter.NOFILTER,
Filter.SHUFFLE,
],
"filters_meta": [0, 0, 0, 0, 0, 0],
"prefilter": None,
"preparams": None,
"tuner": Tuner.STUNE,
"instr_codec": False,
}
"""
Compression params defaults.
"""

# Defaults for decompression params
dparams_dflts = {"nthreads": nthreads, "schunk": None, "postfilter": None, "postparams": None}
"""
Decompression params defaults.
"""
# Default for storage
storage_dflts = {"contiguous": False, "urlpath": None, "cparams": None, "dparams": None, "io": None}
"""
Storage params defaults. This is meant only for :ref:`SChunk <SChunk>` or :ref:`NDArray <NDArray>`.
"""

_disable_overloaded_equal = False

# Delayed imports for avoiding overwriting of python builtins
from .ndarray import (
Expand Down Expand Up @@ -341,7 +305,9 @@ class Tuner(Enum):
"__version__",
"compress",
"decompress",
"CParams",
"cparams_dflts",
"DParams",
"dparams_dflts",
"storage_dflts",
"set_compressor",
Expand Down Expand Up @@ -373,6 +339,7 @@ class Tuner(Enum):
"compress2",
"decompress2",
"SChunk",
"Storage",
"open",
"remove_urlpath",
"nthreads",
Expand Down
109 changes: 54 additions & 55 deletions src/blosc2/blosc2_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -716,14 +716,16 @@ cdef _check_dparams(blosc2_dparams* dparams, blosc2_cparams* cparams=NULL):

cdef create_cparams_from_kwargs(blosc2_cparams *cparams, kwargs):
if "compcode" in kwargs:
raise NameError("`compcode` has been renamed to `codec`. Please go update your code.")
raise NameError("`compcode` has been renamed to `codec`. Please go update your code.")
if "shuffle" in kwargs:
raise NameError("`shuffle` has been substituted by `filters`. Please go update your code.")
codec = kwargs.get('codec', blosc2.cparams_dflts['codec'])
cparams.compcode = codec if not isinstance(codec, blosc2.Codec) else codec.value
cparams.compcode_meta = kwargs.get('codec_meta', blosc2.cparams_dflts['codec_meta'])
cparams.clevel = kwargs.get('clevel', blosc2.cparams_dflts['clevel'])
cparams.use_dict = kwargs.get('use_dict', blosc2.cparams_dflts['use_dict'])
cparams.typesize = typesize = kwargs.get('typesize', blosc2.cparams_dflts['typesize'])
cparams.nthreads = kwargs.get('nthreads', blosc2.cparams_dflts['nthreads'])
cparams.nthreads = kwargs.get('nthreads', blosc2.nthreads)
cparams.blocksize = kwargs.get('blocksize', blosc2.cparams_dflts['blocksize'])
splitmode = kwargs.get('splitmode', blosc2.cparams_dflts['splitmode'])
cparams.splitmode = splitmode.value
Expand Down Expand Up @@ -804,7 +806,7 @@ def compress2(src, **kwargs):
return dest[:size]

cdef create_dparams_from_kwargs(blosc2_dparams *dparams, kwargs, blosc2_cparams* cparams=NULL):
dparams.nthreads = kwargs.get('nthreads', blosc2.dparams_dflts['nthreads'])
dparams.nthreads = kwargs.get('nthreads', blosc2.nthreads)
dparams.schunk = NULL
dparams.postfilter = NULL
dparams.postparams = NULL
Expand Down Expand Up @@ -927,7 +929,7 @@ cdef class SChunk:
self._urlpath = urlpath.encode() if isinstance(urlpath, str) else urlpath
kwargs["urlpath"] = self._urlpath

self.mode = kwargs.get("mode", "a")
self.mode = blosc2.Storage().mode if kwargs.get("mode", None) is None else kwargs.get("mode")
self.mmap_mode = kwargs.get("mmap_mode")
self.initial_mapping_size = kwargs.get("initial_mapping_size")
if self.mmap_mode is not None:
Expand Down Expand Up @@ -1067,16 +1069,6 @@ cdef class SChunk:
else:
# User codec
codec = self.schunk.storage.cparams.compcode
cparams_dict = {
"codec": codec,
"codec_meta": self.schunk.storage.cparams.compcode_meta,
"clevel": self.schunk.storage.cparams.clevel,
"use_dict": self.schunk.storage.cparams.use_dict,
"typesize": self.schunk.storage.cparams.typesize,
"nthreads": self.schunk.storage.cparams.nthreads,
"blocksize": self.schunk.storage.cparams.blocksize,
"splitmode": blosc2.SplitMode(self.schunk.storage.cparams.splitmode)
}

filters = [0] * BLOSC2_MAX_FILTERS
filters_meta = [0] * BLOSC2_MAX_FILTERS
Expand All @@ -1087,42 +1079,50 @@ cdef class SChunk:
# User filter
filters[i] = self.schunk.filters[i]
filters_meta[i] = self.schunk.filters_meta[i]
cparams_dict["filters"] = filters
cparams_dict["filters_meta"] = filters_meta
return cparams_dict

def update_cparams(self, cparams_dict):
cparams = blosc2.CParams(
codec=codec,
codec_meta=self.schunk.storage.cparams.compcode_meta,
clevel=self.schunk.storage.cparams.clevel,
use_dict=bool(self.schunk.storage.cparams.use_dict),
typesize=self.schunk.storage.cparams.typesize,
nthreads=self.schunk.storage.cparams.nthreads,
blocksize=self.schunk.storage.cparams.blocksize,
splitmode=blosc2.SplitMode(self.schunk.storage.cparams.splitmode),
tuner=blosc2.Tuner(self.schunk.storage.cparams.tuner_id),
filters=filters,
filters_meta=filters_meta,
)

return cparams

def update_cparams(self, new_cparams):
cdef blosc2_cparams* cparams = self.schunk.storage.cparams
codec = cparams_dict.get('codec', None)
if codec is None:
cparams.compcode = cparams.compcode
else:
cparams.compcode = codec if not isinstance(codec, blosc2.Codec) else codec.value
cparams.compcode_meta = cparams_dict.get('codec_meta', cparams.compcode_meta)
cparams.clevel = cparams_dict.get('clevel', cparams.clevel)
cparams.use_dict = cparams_dict.get('use_dict', cparams.use_dict)
cparams.typesize = cparams_dict.get('typesize', cparams.typesize)
cparams.nthreads = cparams_dict.get('nthreads', cparams.nthreads)
cparams.blocksize = cparams_dict.get('blocksize', cparams.blocksize)
splitmode = cparams_dict.get('splitmode', None)
cparams.splitmode = cparams.splitmode if splitmode is None else splitmode.value

filters = cparams_dict.get('filters', None)
if filters is not None:
for i, filter in enumerate(filters):
cparams.filters[i] = filter.value if isinstance(filter, Enum) else filter
for i in range(len(filters), BLOSC2_MAX_FILTERS):
cparams.filters[i] = 0

filters_meta = cparams_dict.get('filters_meta', None)
codec = new_cparams.codec
cparams.compcode = codec if not isinstance(codec, blosc2.Codec) else codec.value
cparams.compcode_meta = new_cparams.codec_meta
cparams.clevel = new_cparams.clevel
cparams.use_dict = new_cparams.use_dict
cparams.typesize = new_cparams.typesize
cparams.nthreads = new_cparams.nthreads
cparams.blocksize = new_cparams.blocksize
cparams.splitmode = new_cparams.splitmode.value
cparams.tuner_id = new_cparams.tuner.value

filters = new_cparams.filters
for i, filter in enumerate(filters):
cparams.filters[i] = filter.value if isinstance(filter, Enum) else filter
for i in range(len(filters), BLOSC2_MAX_FILTERS):
cparams.filters[i] = 0

filters_meta = new_cparams.filters_meta
cdef int8_t meta_value
if filters_meta is not None:
for i, meta in enumerate(filters_meta):
# We still may want to encode negative values
meta_value = <int8_t> meta if meta < 0 else meta
cparams.filters_meta[i] = <uint8_t> meta_value
for i in range(len(filters_meta), BLOSC2_MAX_FILTERS):
cparams.filters_meta[i] = 0
for i, meta in enumerate(filters_meta):
# We still may want to encode negative values
meta_value = <int8_t> meta if meta < 0 else meta
cparams.filters_meta[i] = <uint8_t> meta_value
for i in range(len(filters_meta), BLOSC2_MAX_FILTERS):
cparams.filters_meta[i] = 0

_check_cparams(cparams)

Expand All @@ -1140,12 +1140,11 @@ cdef class SChunk:
self.schunk.filters_meta = self.schunk.storage.cparams.filters_meta

def get_dparams(self):
dparams_dict = {"nthreads": self.schunk.storage.dparams.nthreads}
return dparams_dict
return blosc2.DParams(nthreads=self.schunk.storage.dparams.nthreads)

def update_dparams(self, dparams_dict):
def update_dparams(self, new_dparams):
cdef blosc2_dparams* dparams = self.schunk.storage.dparams
dparams.nthreads = dparams_dict.get('nthreads', dparams.nthreads)
dparams.nthreads = new_dparams.nthreads

_check_dparams(dparams, self.schunk.storage.cparams)

Expand Down Expand Up @@ -1964,17 +1963,17 @@ def open(urlpath, mode, offset, **kwargs):
res = blosc2.NDArray(_schunk=PyCapsule_New(array.sc, <char *> "blosc2_schunk*", NULL),
_array=PyCapsule_New(array, <char *> "b2nd_array_t*", NULL))
if cparams is not None:
res.schunk.cparams = cparams
res.schunk.cparams = cparams if isinstance(cparams, blosc2.CParams) else blosc2.CParams(**cparams)
if dparams is not None:
res.schunk.dparams = dparams
res.schunk.dparams = dparams if isinstance(dparams, blosc2.DParams) else blosc2.DParams(**dparams)
res.schunk.mode = mode
else:
res = blosc2.SChunk(_schunk=PyCapsule_New(schunk, <char *> "blosc2_schunk*", NULL),
mode=mode, **kwargs)
if cparams is not None:
res.cparams = cparams
res.cparams = cparams if isinstance(cparams, blosc2.CParams) else blosc2.CParams(**cparams)
if dparams is not None:
res.dparams = dparams
res.dparams = dparams if isinstance(dparams, blosc2.DParams) else blosc2.DParams(**dparams)

return res

Expand Down
Loading
Loading