diff --git a/aiida/backends/tests/workflows.py b/aiida/backends/tests/workflows.py index 714beca46a..78e6157319 100644 --- a/aiida/backends/tests/workflows.py +++ b/aiida/backends/tests/workflows.py @@ -152,6 +152,55 @@ def test_wf_get_state(self): # it is a valid state self.assertIn(wf.get_state(), wf_states) + def test_failing_calc_in_wf(self): + """ + This test checks that a workflow (but also a workflow with + sub-workflows) that has an exception at one of its steps stops + properly and it is not left as RUNNING. + """ + import logging + from aiida.daemon.workflowmanager import execute_steps + from aiida.workflows.test import (FailingWFTestSimple, + FailingWFTestSimpleWithSubWF) + + try: + # First of all, I re-enable logging in case it was disabled by + # mistake by a previous test (e.g. one that disables and reenables + # again, but that failed) + logging.disable(logging.NOTSET) + # Temporarily disable logging to the stream handler (i.e. screen) + # because otherwise fix_calc_states will print warnings + handler = next((h for h in logging.getLogger('aiida').handlers if + isinstance(h, logging.StreamHandler)), None) + if handler: + original_level = handler.level + handler.setLevel(logging.ERROR) + + # Testing the error propagation of a simple workflow + wf = FailingWFTestSimple() + wf.store() + step_no = 0 + wf.start() + while wf.is_running(): + execute_steps() + step_no += 1 + self.assertLess(step_no, 5, "This workflow should have stopped " + "since it is failing") + + # Testing the error propagation of a workflow with subworkflows + wf = FailingWFTestSimpleWithSubWF() + wf.store() + + step_no = 0 + wf.start() + while wf.is_running(): + execute_steps() + step_no += 1 + self.assertLess(step_no, 5, "This workflow should have stopped " + "since it is failing") + finally: + if handler: + handler.setLevel(original_level) def tearDown(self): """ diff --git a/aiida/orm/implementation/sqlalchemy/workflow.py b/aiida/orm/implementation/sqlalchemy/workflow.py index f65b175e13..0d8d01d16a 100644 --- a/aiida/orm/implementation/sqlalchemy/workflow.py +++ b/aiida/orm/implementation/sqlalchemy/workflow.py @@ -637,6 +637,7 @@ def wrapper(cls, *args, **kwargs): wrapped_method)) cls.append_to_report("full traceback: {0}".format(traceback.format_exc())) method_step.set_state(wf_states.ERROR) + cls.set_state(wf_states.ERROR) return None out = wrapper diff --git a/aiida/workflows/test.py b/aiida/workflows/test.py index babfdb1d60..d6e0389674 100644 --- a/aiida/workflows/test.py +++ b/aiida/workflows/test.py @@ -40,6 +40,54 @@ def second_step(self): self.next(self.exit) +class FailingWFTestSimple(WFTestSimple): + @Workflow.step + def start(self): + # Testing calculations + self.attach_calculation(self.generate_calc()) + + # Test process + self.next(self.second_step) + + @Workflow.step + def second_step(self): + # Testing calculations + self.attach_calculation(generate_calc()) + # Raise a test exception that should make the workflow to stop + raise Exception('Test exception') + + # Test process + self.next(self.third_step) + + @Workflow.step + def third_step(self): + self.next(self.exit) + + +class FailingWFTestSimpleWithSubWF(Workflow): + def __init__(self, **kwargs): + super(FailingWFTestSimpleWithSubWF, self).__init__(**kwargs) + + @Workflow.step + def start(self): + self.attach_calculation(generate_calc()) + + # Create two subworkflows + w = FailingWFTestSimple() + w.start() + self.attach_workflow(w) + + w = FailingWFTestSimple() + w.start() + self.attach_workflow(w) + + self.next(self.second) + + @Workflow.step + def second(self): + self.next(self.exit) + + class WFTestSimpleWithSubWF(Workflow): def __init__(self, **kwargs): super(WFTestSimpleWithSubWF, self).__init__(**kwargs)