Skip to content

Commit

Permalink
Always use blosc2.nthreads as default value
Browse files Browse the repository at this point in the history
  • Loading branch information
martaiborra committed Sep 24, 2024
1 parent 83e9026 commit 68e8a60
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 27 deletions.
21 changes: 11 additions & 10 deletions src/blosc2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,23 @@ class Tuner(Enum):
unpack_tensor,
)

# Internal Blosc threading
# Get CPU info
cpu_info = get_cpu_info()
nthreads = ncores = cpu_info.get("count", 1)
"""Number of threads to be used in compression/decompression.
"""
# Protection against too many threads
nthreads = min(nthreads, 32)
# Experiments say that, when using a large number of threads, it is better to not use them all
nthreads -= nthreads // 8

# This import must be before ndarray and schunk
from .storage import (
CParams,
cparams_dflts,
cpu_info,
DParams,
dparams_dflts,
ncores,
nthreads,
Storage,
storage_dflts,
)
Expand Down Expand Up @@ -254,13 +262,6 @@ class Tuner(Enum):
The blosc2 version + date.
"""

# Internal Blosc threading
set_nthreads(nthreads)

# Set the number of threads for NumExpr
numexpr.set_num_threads(nthreads)

_disable_overloaded_equal = False

# Delayed imports for avoiding overwriting of python builtins
from .ndarray import (
Expand Down
4 changes: 2 additions & 2 deletions src/blosc2/blosc2_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ cdef create_cparams_from_kwargs(blosc2_cparams *cparams, kwargs):
cparams.clevel = kwargs.get('clevel', blosc2.cparams_dflts['clevel'])
cparams.use_dict = kwargs.get('use_dict', blosc2.cparams_dflts['use_dict'])
cparams.typesize = typesize = kwargs.get('typesize', blosc2.cparams_dflts['typesize'])
cparams.nthreads = kwargs.get('nthreads', blosc2.cparams_dflts['nthreads'])
cparams.nthreads = kwargs.get('nthreads', blosc2.nthreads)
cparams.blocksize = kwargs.get('blocksize', blosc2.cparams_dflts['blocksize'])
splitmode = kwargs.get('splitmode', blosc2.cparams_dflts['splitmode'])
cparams.splitmode = splitmode.value
Expand Down Expand Up @@ -807,7 +807,7 @@ def compress2(src, **kwargs):
return dest[:size]

cdef create_dparams_from_kwargs(blosc2_dparams *dparams, kwargs, blosc2_cparams* cparams=NULL):
dparams.nthreads = kwargs.get('nthreads', blosc2.dparams_dflts['nthreads'])
dparams.nthreads = kwargs.get('nthreads', blosc2.nthreads)
dparams.schunk = NULL
dparams.postfilter = NULL
dparams.postparams = NULL
Expand Down
3 changes: 2 additions & 1 deletion src/blosc2/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,9 @@ def set_nthreads(nthreads: int) -> int:
--------
:attr:`~blosc2.nthreads`
"""
rc = blosc2_ext.set_nthreads(nthreads)
blosc2.nthreads = nthreads
return blosc2_ext.set_nthreads(nthreads)
return rc


def compressor_list(plugins: bool = False) -> list:
Expand Down
13 changes: 1 addition & 12 deletions src/blosc2/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,9 @@

import blosc2

# Internal Blosc threading
# Get CPU info
cpu_info = blosc2.get_cpu_info()
nthreads = ncores = cpu_info.get("count", 1)
"""Number of threads to be used in compression/decompression.
"""
# Protection against too many threads
nthreads = min(nthreads, 32)
# Experiments say that, when using a large number of threads, it is better to not use them all
nthreads -= nthreads // 8


def default_nthreads():
return nthreads
return blosc2.nthreads

def default_filters():
return [blosc2.Filter.NOFILTER,
Expand Down
24 changes: 22 additions & 2 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,21 @@ def test_cparams_values(cparams):

array = blosc2.empty((30, 30), np.int32, cparams=cparams)
for field in fields(cparams_dataclass):
print(field.name)
if field.name in ['filters', 'filters_meta']:
print(getattr(array.schunk.cparams, field.name))
assert getattr(array.schunk.cparams, field.name)[:len(getattr(cparams_dataclass, field.name))] == getattr(cparams_dataclass, field.name)
elif field.name == 'typesize':
assert getattr(array.schunk.cparams, field.name) == array.dtype.itemsize
elif field.name != 'blocksize':
assert getattr(array.schunk.cparams, field.name) == getattr(cparams_dataclass, field.name)

blosc2.set_nthreads(10)
schunk = blosc2.SChunk(cparams=cparams)
cparams_dataclass = cparams if isinstance(cparams, blosc2.CParams) else blosc2.CParams(**cparams)
assert schunk.cparams.nthreads == cparams_dataclass.nthreads

array = blosc2.empty((30, 30), np.int32, cparams=cparams)
assert array.schunk.cparams.nthreads == cparams_dataclass.nthreads


def test_cparams_defaults():
cparams = blosc2.CParams()
Expand All @@ -122,6 +128,10 @@ def test_cparams_defaults():
assert cparams.blocksize == 0
assert cparams.tuner == blosc2.Tuner.STUNE

blosc2.set_nthreads(1)
cparams = blosc2.CParams()
assert cparams.nthreads == blosc2.nthreads


def test_raises_cparams():
cparams = blosc2.CParams(codec=blosc2.Codec.LZ4, clevel=6, typesize=4)
Expand Down Expand Up @@ -151,11 +161,21 @@ def test_dparams_values(dparams):
assert getattr(schunk.dparams, field.name) == getattr(dparams_dataclass, field.name)
assert getattr(array.schunk.dparams, field.name) == getattr(dparams_dataclass, field.name)

blosc2.set_nthreads(3)
schunk = blosc2.SChunk(dparams=dparams)
dparams_dataclass = dparams if isinstance(dparams, blosc2.DParams) else blosc2.DParams(**dparams)
array = blosc2.empty((30, 30), dparams=dparams)
assert schunk.dparams.nthreads == dparams_dataclass.nthreads
assert array.schunk.dparams.nthreads == dparams_dataclass.nthreads

def test_dparams_defaults():
dparams = blosc2.DParams()
assert dparams.nthreads == blosc2.nthreads

blosc2.set_nthreads(1)
dparams = blosc2.DParams()
assert dparams.nthreads == blosc2.nthreads


def test_raises_dparams():
dparams = blosc2.DParams()
Expand Down

0 comments on commit 68e8a60

Please sign in to comment.