Skip to content

Commit

Permalink
Merge pull request #54 from wxtim/fix-workflow-state-check.tests3
Browse files Browse the repository at this point in the history
more tests
  • Loading branch information
hjoliver authored Jun 14, 2024
2 parents 6c14e7f + f1480cc commit 0a5921c
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 57 deletions.
120 changes: 63 additions & 57 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,12 @@ def _submit_task_jobs(*args, **kwargs):
return _reflog


@pytest.fixture
def complete():
async def _complete(
schd,
*tokens_list: Union[Tokens, str],
stop_mode=StopMode.AUTO,
timeout: int = 60,
) -> None:
"""Wait for the workflow, or tasks within it to complete.
Args:
Expand All @@ -584,65 +588,67 @@ def complete():
async_timeout (handles shutdown logic more cleanly).
"""
async def _complete(
schd,
*tokens_list: Union[Tokens, str],
stop_mode=StopMode.AUTO,
timeout: int = 60,
) -> None:
start_time = time()

_tokens_list: List[Tokens] = []
for tokens in tokens_list:
if isinstance(tokens, str):
tokens = Tokens(tokens, relative=True)
_tokens_list.append(tokens.task)

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask, output=None):
nonlocal _tokens_list
ret = remove_if_complete(itask)
if ret and itask.tokens.task in _tokens_list:
_tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
set_stop = schd._set_stop
has_shutdown = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
if mode == stop_mode:
has_shutdown = True
return set_stop(mode)
else:
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if _tokens_list:
condition = lambda: bool(_tokens_list)
start_time = time()

_tokens_list: List[Tokens] = []
for tokens in tokens_list:
if isinstance(tokens, str):
tokens = Tokens(tokens, relative=True)
_tokens_list.append(tokens.task)

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask, output=None):
nonlocal _tokens_list
ret = remove_if_complete(itask)
if ret and itask.tokens.task in _tokens_list:
_tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
set_stop = schd._set_stop
has_shutdown = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
if mode == stop_mode:
has_shutdown = True
return set_stop(mode)
else:
condition = lambda: bool(not has_shutdown)
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if _tokens_list:
condition = lambda: bool(_tokens_list)
else:
condition = lambda: bool(not has_shutdown)

# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if (time() - start_time) > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, _tokens_list))}'
)

# restore regular shutdown logic
schd._set_stop = set_stop


# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if (time() - start_time) > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, _tokens_list))}'
)
@pytest.fixture
def complete():
return _complete

# restore regular shutdown logic
schd._set_stop = set_stop

@pytest.fixture(scope='module')
def mod_complete():
return _complete


Expand Down
139 changes: 139 additions & 0 deletions tests/integration/test_dbstatecheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Tests for the backend method of workflow_state"""


from asyncio import sleep
import pytest
from textwrap import dedent
from typing import TYPE_CHECKING

from cylc.flow.dbstatecheck import CylcWorkflowDBChecker as Checker


if TYPE_CHECKING:
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker


@pytest.fixture(scope='module')
async def checker(
mod_flow, mod_scheduler, mod_run, mod_complete
) -> 'CylcWorkflowDBChecker':
"""Make a real world database.
We could just write the database manually but this is a better
test of the overall working of the function under test.
"""
wid = mod_flow({
'scheduling': {
'graph': {'P1Y': dedent('''
good:succeeded
bad:failed?
output:custom_output
''')},
'initial cycle point': '1000',
'final cycle point': '1001'
},
'runtime': {
'bad': {'simulation': {'fail cycle points': '1000'}},
'output': {'outputs': {'trigger': 'message'}}
}
})
schd = mod_scheduler(wid, paused_start=False)
async with mod_run(schd):
await mod_complete(schd)
schd.pool.force_trigger_tasks(['1000/good'], [2])
# Allow a cycle of the main loop to pass so that flow 2 can be
# added to db
await sleep(1)
yield Checker(
'somestring', 'utterbunkum',
schd.workflow_db_mgr.pub_path
)


def test_basic(checker):
"""Pass no args, get unfiltered output"""
result = checker.workflow_state_query()
expect = [
['bad', '10000101T0000Z', 'failed'],
['bad', '10010101T0000Z', 'succeeded'],
['good', '10000101T0000Z', 'succeeded'],
['good', '10010101T0000Z', 'succeeded'],
['output', '10000101T0000Z', 'succeeded'],
['output', '10010101T0000Z', 'succeeded'],
['good', '10000101T0000Z', 'waiting', '(flows=2)'],
]
assert result == expect


def test_task(checker):
"""Filter by task name"""
result = checker.workflow_state_query(task='bad')
assert result == [
['bad', '10000101T0000Z', 'failed'],
['bad', '10010101T0000Z', 'succeeded']
]


def test_point(checker):
"""Filter by point"""
result = checker.workflow_state_query(cycle='10000101T0000Z')
assert result == [
['bad', '10000101T0000Z', 'failed'],
['good', '10000101T0000Z', 'succeeded'],
['output', '10000101T0000Z', 'succeeded'],
['good', '10000101T0000Z', 'waiting', '(flows=2)'],
]


def test_status(checker):
"""Filter by status"""
result = checker.workflow_state_query(selector='failed')
expect = [
['bad', '10000101T0000Z', 'failed'],
]
assert result == expect


def test_output(checker):
"""Filter by flow number"""
result = checker.workflow_state_query(selector='message', is_message=True)
expect = [
[
'output',
'10000101T0000Z',
"{'submitted': 'submitted', 'started': 'started', 'succeeded': "
"'succeeded', 'trigger': 'message'}",
],
[
'output',
'10010101T0000Z',
"{'submitted': 'submitted', 'started': 'started', 'succeeded': "
"'succeeded', 'trigger': 'message'}",
],
]
assert result == expect


def test_flownum(checker):
"""Pass no args, get unfiltered output"""
result = checker.workflow_state_query(flow_num=2)
expect = [
['good', '10000101T0000Z', 'waiting', '(flows=2)'],
]
assert result == expect
43 changes: 43 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,46 @@ def mytrig(*args, **kwargs):

# check the DB to ensure no additional entries have been created
assert db_select(schd, True, 'xtriggers') == db_xtriggers


async def test_error_in_xtrigger(flow, start, scheduler):
"""Failure in an xtrigger is handled nicely.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'xtriggers': {
'mytrig': 'mytrig()'
},
'graph': {
'R1': '@mytrig => foo'
},
}
})

# add a custom xtrigger to the workflow
run_dir = Path(get_workflow_run_dir(id_))
xtrig_dir = run_dir / 'lib/python'
xtrig_dir.mkdir(parents=True)
(xtrig_dir / 'mytrig.py').write_text(dedent('''
def mytrig(*args, **kwargs):
raise Exception('This Xtrigger is broken')
'''))

schd = scheduler(id_)
async with start(schd) as log:
foo = schd.pool.get_tasks()[0]
schd.xtrigger_mgr.call_xtriggers_async(foo)
for _ in range(50):
await asyncio.sleep(0.1)
schd.proc_pool.process()
if len(schd.proc_pool.runnings) == 0:
break
else:
raise Exception('Process pool did not clear')

error = log.messages[-1].split('\n')
assert error[-2] == 'Exception: This Xtrigger is broken'
assert error[0] == 'ERROR in xtrigger mytrig()'

0 comments on commit 0a5921c

Please sign in to comment.