Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single threaded alternative processor #773

Merged
merged 14 commits into from
Jun 10, 2024
Merged
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repos:
args: [--safe, --line-length=100, --preview]
- id: black-jupyter
args: [--safe, --line-length=100, --preview]
language_version: python3.9
language_version: python3

- repo: https://github.com/pycqa/docformatter
rev: v1.7.5
Expand Down
1 change: 1 addition & 0 deletions strax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from .mailbox import *
from .processor import *
from .processors import *
from .context import *
from .run_selection import *
from .corrections import *
Expand Down
41 changes: 41 additions & 0 deletions strax/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,44 @@ def _update_subruns_in_chunk(chunks):
else:
subruns[subrun_id] = subrun_start_end
return subruns


@export
class Rechunker:
"""Helper class for rechunking.

Send in chunks via receive, which returns either None (no chunk to send) or a chunk to send.

Don't forget a final call to .flush() to get any final data out!

"""

def __init__(self, rechunk=False, run_id=None):
self.rechunk = rechunk
self.is_superrun = run_id and run_id.startswith("_") and not run_id.startswith("__")
self.run_id = run_id

self.cache = None

def receive(self, chunk):
if self.is_superrun:
chunk = strax.transform_chunk_to_superrun_chunk(self.run_id, chunk)
if not self.rechunk:
# We aren't rechunking
return chunk
if self.cache:
# We have an old chunk, so we need to concatenate
chunk = strax.Chunk.concatenate([self.cache, chunk])
if chunk.data.nbytes >= chunk.target_size_mb * 1e6:
# Enough data to send a new chunk!
self.cache = None
return chunk
else:
# Not enough data yet, so we cache the chunk
self.cache = chunk
return None

def flush(self):
result = self.cache
self.cache = None
return result
54 changes: 45 additions & 9 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ class Context:
_run_defaults_cache: dict
storage: ty.List[strax.StorageFrontend]

def __init__(self, storage=None, config=None, register=None, register_all=None, **kwargs):
processors: ty.Mapping[str, strax.BaseProcessor]

def __init__(
self, storage=None, config=None, register=None, register_all=None, processors=None, **kwargs
):
"""Create a strax context.

:param storage: Storage front-ends to use. Can be:
Expand All @@ -202,7 +206,9 @@ def __init__(self, storage=None, config=None, register=None, register_all=None,
applied to plugins
:param register: plugin class or list of plugin classes to register
:param register_all: module for which all plugin classes defined in it
will be registered.
will be registered.
:param processors: A mapping of processor names to classes to use for
data processing.
Any additional kwargs are considered Context-specific options; see
Context.takes_config.

Expand All @@ -226,12 +232,32 @@ def __init__(self, storage=None, config=None, register=None, register_all=None,
if register is not None:
self.register(register)

if processors is None:
processors = strax.PROCESSORS

if isinstance(processors, str):
processors = [processors]

if isinstance(processors, (list, tuple)):
ps = {}
for processor in processors:
if isinstance(processor, str) and processor in strax.PROCESSORS:
ps[processor] = strax.PROCESSORS[processor]
elif isinstance(processor, strax.BaseProcessor):
ps[processor.__name__] = processor
else:
raise ValueError(f"Unknown processor {processor}")
processors = ps

self.processors = processors

def new_context(
self,
storage=tuple(),
config=None,
register=None,
register_all=None,
processors=None,
replace=False,
**kwargs,
):
Expand All @@ -255,7 +281,7 @@ def new_context(
config = strax.combine_configs(self.config, config, mode="update")
kwargs = strax.combine_configs(self.context_config, kwargs, mode="update")

new_c = Context(storage=storage, config=config, **kwargs)
new_c = Context(storage=storage, config=config, processors=processors, **kwargs)
if not replace:
new_c._plugin_class_registry = self._plugin_class_registry.copy()
new_c.register_all(register_all)
Expand Down Expand Up @@ -1434,7 +1460,7 @@ def to_absolute_time_range(
def get_iter(
self,
run_id: str,
targets: ty.Union[ty.Tuple[str], ty.List[str]],
targets,
save=tuple(),
max_workers=None,
time_range=None,
Expand All @@ -1449,6 +1475,7 @@ def get_iter(
progress_bar=True,
multi_run_progress_bar=True,
_chunk_number=None,
processor=None,
**kwargs,
) -> ty.Iterator[strax.Chunk]:
"""Compute target for run_id and iterate over results.
Expand Down Expand Up @@ -1516,8 +1543,17 @@ def get_iter(
if k.startswith("_temp"):
del self._plugin_class_registry[k]

if processor is None:
processor = list(self.processors)[0]

if isinstance(processor, str):
processor = self.processors[processor]

if not hasattr(processor, "iter"):
raise ValueError("Processors must implement a iter methed.")

seen_a_chunk = False
generator = strax.ThreadedMailboxProcessor(
generator = processor(
components,
max_workers=max_workers,
allow_shm=self.context_config["allow_shm"],
Expand Down Expand Up @@ -2542,8 +2578,7 @@ def add_method(cls, f):
:param multi_run_progress_bar: Display a progress bar for loading multiple runs
"""

get_docs = (
"""
get_docs = """
:param run_id: run id to get
:param targets: list/tuple of strings of data type names to get
:param ignore_errors: Return the data for the runs that successfully loaded, even if some runs
Expand All @@ -2563,9 +2598,10 @@ def add_method(cls, f):
:param run_id_as_bytes: Boolean if true uses byte string instead of an
unicode string added to a multi-run array. This can save a lot of
memory when loading many runs.
:param processor: Name of the processor to use. If not specified, the
first processor from the context's processor list is used.
"""
+ select_docs
)
get_docs += select_docs

for attr in dir(Context):
attr_val = getattr(Context, attr)
Expand Down
Loading