Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-46525: switch back to raising on partial output errors by default #309

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/changes/DM-46525.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Set the default for raising on partial output error to `True`.

Allowing processing to proceed when we encounter an error that may not be fatal is functionality we'll still want eventually, but enabling it by default was premature, since our processing-status reporting tools are yet able to distinguish these cases from unqualified successes.
3 changes: 2 additions & 1 deletion python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,10 @@
)

raise_on_partial_outputs_option = MWOptionDecorator(
"--raise-on-partial-outputs",
"--raise-on-partial-outputs/--no-raise-on-partial-outputs",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool, I didn't know you could do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I learned about it on this ticket, too!

help="Consider partial outputs from a task an error instead of a qualified success.",
is_flag=True,
default=True,
)

save_execution_butler_option = MWOptionDecorator(
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/separablePipelineExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __init__(
skip_existing_in: Iterable[str] | None = None,
task_factory: lsst.pipe.base.TaskFactory | None = None,
resources: lsst.pipe.base.ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
):
self._butler = Butler.from_config(butler=butler, collections=butler.collections, run=butler.run)
if not self._butler.collections:
Expand Down
10 changes: 5 additions & 5 deletions python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(
quantum_graph: QuantumGraph,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
):
self.quantum_graph = quantum_graph
self.butler = butler
Expand Down Expand Up @@ -148,7 +148,7 @@ def from_pipeline_filename(
bind: Mapping[str, Any] | None = None,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an on-disk
pipeline YAML file.
Expand Down Expand Up @@ -201,7 +201,7 @@ def from_task_class(
bind: Mapping[str, Any] | None = None,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from a pipeline
containing a single task.
Expand Down Expand Up @@ -268,7 +268,7 @@ def from_pipeline(
bind: Mapping[str, Any] | None = None,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an in-memory
pipeline.
Expand Down Expand Up @@ -321,7 +321,7 @@ def from_pipeline_graph(
bind: Mapping[str, Any] | None = None,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an in-memory
pipeline graph.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(
resources: ExecutionResources | None = None,
skipExisting: bool = False,
assumeNoExistingOutputs: bool = False,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
):
self.butler = butler
self.taskFactory = taskFactory
Expand Down
6 changes: 4 additions & 2 deletions tests/test_simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,10 @@ def test_partial_outputs_success(self):
pipeline_graph = PipelineGraph()
pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
# Default behavior is to consider the partial a success and proceed.
executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler)
# Consider the partial a success and proceed.
executor = SimplePipelineExecutor.from_pipeline_graph(
pipeline_graph, butler=self.butler, raise_on_partial_outputs=False
)
(_, _) = executor.as_generator(register_dataset_types=True)
self.assertFalse(self.butler.exists("intermediate"))
self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict"))
Expand Down
Loading