Skip to content

Commit

Permalink
Add retrieve job logs * settings to platform clash check
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Jul 19, 2022
1 parent f3aeb4e commit 2a34672
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 62 deletions.
8 changes: 5 additions & 3 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,9 +1306,11 @@ def configure_sim_modes(self):

# Dummy mode jobs should run on platform localhost
# All Cylc 7 config items which conflict with platform are removed.
for section, key, _ in FORBIDDEN_WITH_PLATFORM:
if (section in rtc and key in rtc[section]):
rtc[section][key] = None
for section, keys in FORBIDDEN_WITH_PLATFORM.items():
if section in rtc:
for key in keys:
if key in rtc[section]:
rtc[section][key] = None

rtc['platform'] = 'localhost'

Expand Down
106 changes: 60 additions & 46 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import re
from copy import deepcopy
from typing import (
Any, Dict, Iterable, List, Optional, Tuple, Set, Union, overload
TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union, overload
)

from cylc.flow import LOG
Expand All @@ -31,13 +31,24 @@
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.hostuserutil import is_remote_host

if TYPE_CHECKING:
from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults

UNKNOWN_TASK = 'unknown task'

FORBIDDEN_WITH_PLATFORM: Tuple[Tuple[str, str, List[Optional[str]]], ...] = (
('remote', 'host', ['localhost', None]),
('job', 'batch system', [None]),
('job', 'batch submit command template', [None])
)
FORBIDDEN_WITH_PLATFORM: Dict[str, Dict[str, List[Optional[str]]]] = {
'remote': {
# setting: exclusions
'host': ['localhost', None],
'retrieve job logs': [None],
'retrieve job logs max size': [None],
'retrieve job logs retry delays': [None],
},
'job': {
'batch system': [None],
'batch submit command template': [None]
}
}

DEFAULT_JOB_RUNNER = 'background'
SINGLE_HOST_JOB_RUNNERS = ['background', 'at']
Expand Down Expand Up @@ -77,7 +88,7 @@ def get_platform(

@overload
def get_platform(
task_conf: Dict[str, Any],
task_conf: Union[dict, 'OrderedDictWithDefaults'],
task_id: str = UNKNOWN_TASK,
bad_hosts: Optional[Set[str]] = None
) -> Optional[Dict[str, Any]]:
Expand All @@ -93,7 +104,7 @@ def get_platform(
# remove at:
# Cylc8.x
def get_platform(
task_conf: Union[str, Dict[str, Any], None] = None,
task_conf: Union[str, dict, 'OrderedDictWithDefaults', None] = None,
task_id: str = UNKNOWN_TASK,
bad_hosts: Optional[Set[str]] = None
) -> Optional[Dict[str, Any]]:
Expand All @@ -117,6 +128,8 @@ def get_platform(
# task_conf is a platform name, or get localhost if None
return platform_from_name(task_conf, bad_hosts=bad_hosts)

# NOTE: Do NOT use .get() on OrderedDictWithDefaults -
# https://github.com/cylc/cylc-flow/pull/4975
elif 'platform' in task_conf and task_conf['platform']:
# Check whether task has conflicting Cylc7 items.
fail_if_platform_and_host_conflict(task_conf, task_id)
Expand All @@ -136,8 +149,7 @@ def get_platform(
return platform_from_name()
else:
# Need to calculate platform
task_job_section: Dict[Any, Any] = {}
task_remote_section: Dict[Any, Any] = {}
# NOTE: Do NOT use .get() on OrderedDictWithDefaults - see above
task_job_section = task_conf['job'] if 'job' in task_conf else {}
task_remote_section = (
task_conf['remote'] if 'remote' in task_conf else {})
Expand Down Expand Up @@ -240,7 +252,7 @@ def platform_from_name(


def get_platform_from_group(
group: Dict[str, Any],
group: Union[dict, 'OrderedDictWithDefaults'],
group_name: str,
bad_hosts: Optional[Set[str]] = None
) -> str:
Expand Down Expand Up @@ -289,7 +301,7 @@ def get_platform_from_group(


def platform_name_from_job_info(
platforms: Dict[str, Any],
platforms: Union[dict, 'OrderedDictWithDefaults'],
job: Dict[str, Any],
remote: Dict[str, Any]
) -> str:
Expand Down Expand Up @@ -380,6 +392,9 @@ def platform_name_from_job_info(
# - In the case of "host" we also want a regex match to the platform name
# - In the case of "batch system" we want to match the name of the
# system/job runner to a platform when host is localhost.

# NOTE: Do NOT use .get() on OrderedDictWithDefaults -
# https://github.com/cylc/cylc-flow/pull/4975
if 'host' in remote and remote['host']:
task_host = remote['host']
else:
Expand All @@ -389,6 +404,7 @@ def platform_name_from_job_info(
else:
# Necessary? Perhaps not if batch system default is 'background'
task_job_runner = 'background'

# Riffle through the platforms looking for a match to our task settings.
# reverse dict order so that user config platforms added last are examined
# before site config platforms.
Expand Down Expand Up @@ -509,42 +525,38 @@ def get_host_from_platform(
return HOST_SELECTION_METHODS[method](goodhosts)


def fail_if_platform_and_host_conflict(task_conf, task_name):
"""Raise an error if task spec contains platform and forbidden host items.
def fail_if_platform_and_host_conflict(
task_conf: Union[dict, 'OrderedDictWithDefaults'], task_name: str
) -> None:
"""Raise an error if [runtime][<task>] spec contains platform and
forbidden host items.
Args:
task_conf(dict, OrderedDictWithDefaults):
A specification to be checked.
task_name(string):
A name to be given in an error.
task_conf: A specification to be checked.
task_name: A name to be given in an error.
Raises:
PlatformLookupError - if platform and host items conflict
"""
# NOTE: Do NOT use .get() on OrderedDictWithDefaults -
# https://github.com/cylc/cylc-flow/pull/4975
if 'platform' in task_conf and task_conf['platform']:
fail_items = ''
for section, key, _ in FORBIDDEN_WITH_PLATFORM:
if (
section in task_conf and
key in task_conf[section] and
task_conf[section][key] is not None
):
fail_items += (
f' * platform = {task_conf["platform"]} AND'
f' [{section}]{key} = {task_conf[section][key]}\n'
)
if fail_items != '':
fail_items = [
f"\n * [[[{section}]]]" for section in FORBIDDEN_WITH_PLATFORM
if section in task_conf and task_conf[section]
]
if fail_items:
raise PlatformLookupError(
f"A mixture of Cylc 7 (host) and Cylc 8 (platform) "
f"logic should not be used. In this case the task "
f"\"{task_name}\" has the following settings which "
f"are not compatible:\n{fail_items}"
f"Task '{task_name}' has the following deprecated '[runtime]' "
"section(s) which cannot be used with "
f"'platform = {task_conf['platform']}':{''.join(fail_items)}"
)


def get_platform_deprecated_settings(
task_conf: Dict[str, Any], task_name: str = UNKNOWN_TASK
task_conf: Union[dict, 'OrderedDictWithDefaults'],
task_name: str = UNKNOWN_TASK
) -> List[str]:
"""Return deprecated [runtime][<task_name>] settings that should be
upgraded to platforms.
Expand All @@ -553,18 +565,16 @@ def get_platform_deprecated_settings(
task_conf: Runtime configuration for the task.
task_name: The task name.
"""
result: List[str] = []
for section, key, exceptions in FORBIDDEN_WITH_PLATFORM:
return [
f'[runtime][{task_name}][{section}]{key} = {task_conf[section][key]}'
for section, keys in FORBIDDEN_WITH_PLATFORM.items()
if section in task_conf
for key, exclusions in keys.items()
if (
section in task_conf and
key in task_conf[section] and
task_conf[section][key] not in exceptions
):
result.append(
f'[runtime][{task_name}][{section}]{key} = '
f'{task_conf[section][key]}'
)
return result
task_conf[section][key] not in exclusions
)
]


def is_platform_definition_subshell(value: str) -> bool:
Expand Down Expand Up @@ -664,14 +674,18 @@ def get_localhost_install_target() -> str:
return get_install_target_from_platform(localhost)


def _validate_single_host(platforms_cfg) -> None:
def _validate_single_host(
platforms_cfg: Union[dict, 'OrderedDictWithDefaults']
) -> None:
"""Check that single-host platforms only specify a single host.
Some job runners don't work across multiple hosts; the job ID is only valid
on the specific submission host.
"""
bad_platforms = []
runners = set()
name: str
config: dict
for name, config in platforms_cfg.items():
runner = config.get('job runner', DEFAULT_JOB_RUNNER)
hosts = config.get('hosts', [])
Expand Down
25 changes: 20 additions & 5 deletions tests/unit/test_platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,29 +555,44 @@ def test_get_all_platforms_for_install_target(mock_glbl_cfg):
@pytest.mark.parametrize(
'task_conf, expected',
[
(
pytest.param(
{
'remote': {'host': 'cylcdevbox'},
'remote': {
'host': 'cylcdevbox',
'retrieve job logs': True
},
'job': {
'batch system': 'pbs',
'batch submit command template': 'meow'
}
},
[
'[runtime][task][job]batch submit command template = meow',
'[runtime][task][remote]retrieve job logs = True',
'[runtime][task][remote]host = cylcdevbox',
'[runtime][task][job]batch system = pbs'
]
],
id="All are deprecated settings"
),
(
pytest.param(
{
'remote': {'host': 'localhost'},
'job': {
'batch system': 'pbs',
'batch submit command template': None
}
},
['[runtime][task][job]batch system = pbs']
['[runtime][task][job]batch system = pbs'],
id="Exclusions are excluded"
),
pytest.param(
{
'environment filter': {
'include': ['frodo', 'sam']
}
},
[],
id="No deprecated settings"
)
]
)
Expand Down
30 changes: 22 additions & 8 deletions tests/unit/test_platforms_get_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,33 @@ def test_get_platform_from_platform_name_str(mock_glbl_cfg, platform_re):
assert platform['job runner'] == 'slurm'


def test_get_platform_cylc7_8_syntax_mix_fails(mock_glbl_cfg):
@pytest.mark.parametrize(
'task_conf',
[
{
'platform': 'localhost',
'remote': {
'host': 'localhost'
}
},
{
'platform': 'gondor',
'remote': {
'retrieve job logs': False
}
}
]
)
def test_get_platform_cylc7_8_syntax_mix_fails(task_conf: dict):
"""If a task with a mix of Cylc7 and 8 syntax is passed to get_platform
this should return an error.
"""
task_conf = {
'platform': 'localhost',
'remote': {
'host': 'localhost'
}
}
with pytest.raises(
PlatformLookupError,
match=r'A mixture of Cylc 7 \(host\) and Cylc 8 \(platform\).*'
match=(
r"Task .* has the following deprecated '\[runtime\]' section\(s\) "
r"which cannot be used with 'platform.*"
)
):
get_platform(task_conf)

Expand Down

0 comments on commit 2a34672

Please sign in to comment.