Skip to content

Commit

Permalink
Set SingleThreadProcessor as the default processor (#904)
Browse files Browse the repository at this point in the history
* Set `SingleThreadProcessor` as the default processor

* Disallow multiple `data_kind`s processing with single thread processor

* Fix bug

* Fix bugs

* Debug
  • Loading branch information
dachengx authored Oct 16, 2024
1 parent c509e69 commit 2eda39d
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 24 deletions.
20 changes: 10 additions & 10 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,15 @@ def get_iter(
# (otherwise potentially overwritten in temp-plugin)
targets_list = targets

if processor is None:
processor = list(self.processors)[0]

if isinstance(processor, str):
processor = self.processors[processor]

if not hasattr(processor, "iter"):
raise ValueError("Processors must implement a iter methed.")

is_superrun = run_id.startswith("_")

# If multiple targets of the same kind, create a MergeOnlyPlugin
Expand All @@ -1557,7 +1566,7 @@ def get_iter(
p = type(temp_name, (strax.MergeOnlyPlugin,), dict(depends_on=tuple(targets)))
self.register(p)
targets = (temp_name,)
elif not allow_multiple:
elif not allow_multiple or processor is strax.SingleThreadProcessor:
raise RuntimeError("Cannot automerge different data kinds!")
elif self.context_config["timeout"] > 7200 or (
self.context_config["allow_lazy"] and not self.context_config["allow_multiprocess"]
Expand All @@ -1582,15 +1591,6 @@ def get_iter(
if k.startswith("_temp"):
del self._plugin_class_registry[k]

if processor is None:
processor = list(self.processors)[0]

if isinstance(processor, str):
processor = self.processors[processor]

if not hasattr(processor, "iter"):
raise ValueError("Processors must implement a iter methed.")

seen_a_chunk = False
generator = processor(
components,
Expand Down
2 changes: 1 addition & 1 deletion strax/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .single_thread import SingleThreadProcessor

PROCESSORS = {
"default": ThreadedMailboxProcessor,
"default": SingleThreadProcessor,
"threaded_mailbox": ThreadedMailboxProcessor,
"single_thread": SingleThreadProcessor,
}
6 changes: 4 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,15 @@ def test_allow_multiple(targets=("peaks", "records")):
raise ValueError(f"{function} could run with allow_multiple")

try:
mystrax.make(run_id=run_id, targets=targets)
mystrax.make(run_id=run_id, targets=targets, processor="threaded_mailbox")
except RuntimeError:
# Great, we shouldn't be allowed
pass

assert not mystrax.is_stored(run_id, "peaks")
mystrax.make(run_id=run_id, allow_multiple=True, targets=targets)
mystrax.make(
run_id=run_id, allow_multiple=True, targets=targets, processor="threaded_mailbox"
)

for t in targets:
assert mystrax.is_stored(run_id, t)
Expand Down
1 change: 1 addition & 0 deletions tests/test_inline_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_inline(self, **make_kwargs):
run_id,
targets,
allow_multiple=True,
processor="threaded_mailbox",
**make_kwargs,
)
for target in list(st._plugin_class_registry.keys()):
Expand Down
23 changes: 13 additions & 10 deletions tests/test_loop_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _loop_test_inner(
target="added_thing",
time_selection="fully_contained",
force_value_error=False,
processor=None,
):
"""Test loop plugins for random data. For this test we are going to setup two plugins that will
be looped over and combined into a loop plugin (depending on the target, this may be a multi
Expand Down Expand Up @@ -161,7 +162,7 @@ def compute_loop(self, big_kinda_data, small_kinda_data):
st.register((BigThing, SmallThing, AddBigToSmall, AddBigToSmallMultiOutput))

# Make small thing in order to allow re-chunking
st.make(run_id="some_run", targets="small_thing")
st.make(run_id="some_run", targets="small_thing", processor=processor)

# Make the loop plugin
result = st.get_array(run_id="some_run", targets=target)
Expand Down Expand Up @@ -212,16 +213,18 @@ def test_loop_plugin_multi_output(
)
@settings(deadline=None)
@example(big_data=np.array([], dtype=full_dt_dtype), nchunks=2)
def test_value_error_for_loop_plugin(big_data, nchunks):
def test_error_for_loop_plugin(big_data, nchunks):
"""Make sure that we are are getting the right ValueError."""
try:
_loop_test_inner(big_data, nchunks, force_value_error=True)
raise RuntimeError(
"did not run into ValueError despite the fact we are having multiple none-type chunks"
)
except ValueError:
# Good we got the ValueError we wanted
pass
for error, processor in zip([RuntimeError, ValueError], ["single_thread", "threaded_mailbox"]):
try:
_loop_test_inner(big_data, nchunks, force_value_error=True, processor=processor)
raise RuntimeError(
f"Did not run into {error.__name__} despite the fact "
"we are having multiple none-type chunks"
)
except error:
# Good we got the Error we wanted
pass


@given(
Expand Down
4 changes: 3 additions & 1 deletion tests/test_mongo_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ def test_allow_incomplete(self):

self.log.info(f"Starting with empty db {self.chunk_summary}")
# Get the iterator separately and complete with "next(iterator)
iterator = self.st.get_iter(self.test_run_id, self.mongo_target)
iterator = self.st.get_iter(
self.test_run_id, self.mongo_target, processor="threaded_mailbox"
)

self.log.info(f"Got iterator, still no data?: {self.chunk_summary}")
# Chunk 0
Expand Down

0 comments on commit 2eda39d

Please sign in to comment.