diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index 0672bb35..24b6f96e 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -188,15 +188,23 @@ class Tuner(Enum): unpack_tensor, ) +# Internal Blosc threading +# Get CPU info +cpu_info = get_cpu_info() +nthreads = ncores = cpu_info.get("count", 1) +"""Number of threads to be used in compression/decompression. +""" +# Protection against too many threads +nthreads = min(nthreads, 32) +# Experiments say that, when using a large number of threads, it is better to not use them all +nthreads -= nthreads // 8 + # This import must be before ndarray and schunk from .storage import ( CParams, cparams_dflts, - cpu_info, DParams, dparams_dflts, - ncores, - nthreads, Storage, storage_dflts, ) @@ -254,13 +262,6 @@ class Tuner(Enum): The blosc2 version + date. """ -# Internal Blosc threading -set_nthreads(nthreads) - -# Set the number of threads for NumExpr -numexpr.set_num_threads(nthreads) - -_disable_overloaded_equal = False # Delayed imports for avoiding overwriting of python builtins from .ndarray import ( diff --git a/src/blosc2/blosc2_ext.pyx b/src/blosc2/blosc2_ext.pyx index b44b9284..6358186c 100644 --- a/src/blosc2/blosc2_ext.pyx +++ b/src/blosc2/blosc2_ext.pyx @@ -726,7 +726,7 @@ cdef create_cparams_from_kwargs(blosc2_cparams *cparams, kwargs): cparams.clevel = kwargs.get('clevel', blosc2.cparams_dflts['clevel']) cparams.use_dict = kwargs.get('use_dict', blosc2.cparams_dflts['use_dict']) cparams.typesize = typesize = kwargs.get('typesize', blosc2.cparams_dflts['typesize']) - cparams.nthreads = kwargs.get('nthreads', blosc2.cparams_dflts['nthreads']) + cparams.nthreads = kwargs.get('nthreads', blosc2.nthreads) cparams.blocksize = kwargs.get('blocksize', blosc2.cparams_dflts['blocksize']) splitmode = kwargs.get('splitmode', blosc2.cparams_dflts['splitmode']) cparams.splitmode = splitmode.value @@ -807,7 +807,7 @@ def compress2(src, **kwargs): return dest[:size] cdef create_dparams_from_kwargs(blosc2_dparams *dparams, kwargs, blosc2_cparams* cparams=NULL): - dparams.nthreads = kwargs.get('nthreads', blosc2.dparams_dflts['nthreads']) + dparams.nthreads = kwargs.get('nthreads', blosc2.nthreads) dparams.schunk = NULL dparams.postfilter = NULL dparams.postparams = NULL diff --git a/src/blosc2/core.py b/src/blosc2/core.py index 2f5ab25b..7eb2da54 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -894,8 +894,9 @@ def set_nthreads(nthreads: int) -> int: -------- :attr:`~blosc2.nthreads` """ + rc = blosc2_ext.set_nthreads(nthreads) blosc2.nthreads = nthreads - return blosc2_ext.set_nthreads(nthreads) + return rc def compressor_list(plugins: bool = False) -> list: diff --git a/src/blosc2/storage.py b/src/blosc2/storage.py index 6d1ba296..9abf126b 100644 --- a/src/blosc2/storage.py +++ b/src/blosc2/storage.py @@ -11,20 +11,9 @@ import blosc2 -# Internal Blosc threading -# Get CPU info -cpu_info = blosc2.get_cpu_info() -nthreads = ncores = cpu_info.get("count", 1) -"""Number of threads to be used in compression/decompression. -""" -# Protection against too many threads -nthreads = min(nthreads, 32) -# Experiments say that, when using a large number of threads, it is better to not use them all -nthreads -= nthreads // 8 - def default_nthreads(): - return nthreads + return blosc2.nthreads def default_filters(): return [blosc2.Filter.NOFILTER, diff --git a/tests/test_storage.py b/tests/test_storage.py index e320da25..93b64158 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -98,15 +98,21 @@ def test_cparams_values(cparams): array = blosc2.empty((30, 30), np.int32, cparams=cparams) for field in fields(cparams_dataclass): - print(field.name) if field.name in ['filters', 'filters_meta']: - print(getattr(array.schunk.cparams, field.name)) assert getattr(array.schunk.cparams, field.name)[:len(getattr(cparams_dataclass, field.name))] == getattr(cparams_dataclass, field.name) elif field.name == 'typesize': assert getattr(array.schunk.cparams, field.name) == array.dtype.itemsize elif field.name != 'blocksize': assert getattr(array.schunk.cparams, field.name) == getattr(cparams_dataclass, field.name) + blosc2.set_nthreads(10) + schunk = blosc2.SChunk(cparams=cparams) + cparams_dataclass = cparams if isinstance(cparams, blosc2.CParams) else blosc2.CParams(**cparams) + assert schunk.cparams.nthreads == cparams_dataclass.nthreads + + array = blosc2.empty((30, 30), np.int32, cparams=cparams) + assert array.schunk.cparams.nthreads == cparams_dataclass.nthreads + def test_cparams_defaults(): cparams = blosc2.CParams() @@ -122,6 +128,10 @@ def test_cparams_defaults(): assert cparams.blocksize == 0 assert cparams.tuner == blosc2.Tuner.STUNE + blosc2.set_nthreads(1) + cparams = blosc2.CParams() + assert cparams.nthreads == blosc2.nthreads + def test_raises_cparams(): cparams = blosc2.CParams(codec=blosc2.Codec.LZ4, clevel=6, typesize=4) @@ -151,11 +161,21 @@ def test_dparams_values(dparams): assert getattr(schunk.dparams, field.name) == getattr(dparams_dataclass, field.name) assert getattr(array.schunk.dparams, field.name) == getattr(dparams_dataclass, field.name) + blosc2.set_nthreads(3) + schunk = blosc2.SChunk(dparams=dparams) + dparams_dataclass = dparams if isinstance(dparams, blosc2.DParams) else blosc2.DParams(**dparams) + array = blosc2.empty((30, 30), dparams=dparams) + assert schunk.dparams.nthreads == dparams_dataclass.nthreads + assert array.schunk.dparams.nthreads == dparams_dataclass.nthreads def test_dparams_defaults(): dparams = blosc2.DParams() assert dparams.nthreads == blosc2.nthreads + blosc2.set_nthreads(1) + dparams = blosc2.DParams() + assert dparams.nthreads == blosc2.nthreads + def test_raises_dparams(): dparams = blosc2.DParams()