Skip to content

Commit

Permalink
Merge branch 'main' into refactor/submission-templates
Browse files Browse the repository at this point in the history
  • Loading branch information
b-butler authored Oct 20, 2023
2 parents 0cce0d6 + fd1f3d0 commit f8db823
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 30 deletions.
11 changes: 11 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ Changelog
The **signac-flow** package follows `semantic versioning <https://semver.org/>`_.
The numbers in brackets denote the related GitHub issue and/or pull request.

Version 0.27
============

[0.27.0] -- 202x-xx-xx
----------------------

Fixed
+++++

- Restored static output of non-singleton groups (#773, #774).

Version 0.26
============

Expand Down
81 changes: 53 additions & 28 deletions flow/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2705,13 +2705,15 @@ def compute_status(data):
{
"aggregate_id": aggregate_id,
"group_name": op_name,
"status": operation_status,
# Need to copy status otherwise we can overwrite the status
# dictionary of all constituent operations at once.
"status": operation_status.copy(),
"_error": error_text,
}
)
return result

status_groups = set(self._gather_flow_groups(names))
status_groups = self._gather_selected_flow_groups(names)

with self._buffered():
aggregate_groups = list(
Expand Down Expand Up @@ -3682,7 +3684,7 @@ def select(operation):
# Generate _JobOperation instances for selected groups and aggregates.
with self._buffered():
operations = []
run_groups = set(self._gather_flow_groups(names))
run_groups = set(self._gather_executable_flow_groups(names))
for (
aggregate_id,
aggregate,
Expand Down Expand Up @@ -3744,17 +3746,16 @@ def key_func_by_job(operation):
operations, pretend=pretend, np=np, timeout=timeout, progress=progress
)

def _gather_flow_groups(self, names=None):
def _gather_selected_flow_groups(self, names=None):
r"""Grabs :class:`~.FlowGroup`\ s that match any of a set of names.
The provided names can be any regular expressions that fully match a group name.
The provided names can be any regular expression that fully matches a group name.
Parameters
----------
names : iterable of :class:`str`
Only select operations that match the provided set of names
(interpreted as regular expressions), or all if the argument is
None. (Default value = None)
Only select groups that match the provided set of names (interpreted as regular
expressions), or all if the argument is None. (Default value = None)
Returns
-------
Expand All @@ -3763,24 +3764,48 @@ def _gather_flow_groups(self, names=None):
"""
if names is None:
# If no names are selected, use all singleton groups
operations = [self._groups[name] for name in self.operations]
else:
operations = {}
for name in names:
if name in operations:
continue
groups = [
group
for group_name, group in self.groups.items()
if re.fullmatch(name, group_name)
]
if len(groups) > 0:
for group in groups:
operations[group.name] = group
else:
continue
operations = list(operations.values())
return list(self._groups.values())
operations = {}
for name in names:
if name in operations:
continue
groups = [
group
for group_name, group in self.groups.items()
if re.fullmatch(name, group_name)
]
for group in groups:
operations[group.name] = group
return list(operations.values())

def _gather_executable_flow_groups(self, names=None):
r"""Grabs immediately executable flow groups that match any given name.
The provided names can be any regular expression that fully match a group name.
Note
----
The behavior is distinct from ``_gather_selected_flow_groups`` in that
for execution the default set is not all groups but all singleton
groups (operations).
Parameters
----------
names : iterable of :class:`str`
Only select groups that match the provided set of names (interpreted as regular
expressions), or all singleton groups if the argument is None. (Default value = None)
Returns
-------
list
List of groups matching the provided names.
"""
if names is None:
return [self._groups[op_name] for op_name in self.operations]
operations = self._gather_selected_flow_groups(names)
# Have to verify no overlap to ensure all returned groups are
# simultaneously executable.
if not FlowProject._verify_group_compatibility(operations):
raise ValueError(
"Cannot specify groups or operations that will be included "
Expand Down Expand Up @@ -3829,7 +3854,7 @@ def _get_submission_operations(
been collected appropriately from its contained operations.
"""
submission_groups = set(self._gather_flow_groups(names))
submission_groups = set(self._gather_executable_flow_groups(names))

# Fetch scheduler status
scheduler_info = self._query_scheduler_status()
Expand Down Expand Up @@ -4485,7 +4510,7 @@ def _next_operations(
if operation_names is None:
selected_groups = {self._groups[name] for name in self.operations}
else:
selected_groups = set(self._gather_flow_groups(operation_names))
selected_groups = set(self._gather_executable_flow_groups(operation_names))
for (
aggregate_id,
aggregate,
Expand Down
Binary file modified tests/status_reference_data.tar.gz
Binary file not shown.
4 changes: 2 additions & 2 deletions tests/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ def test_groups(self, groups, get_status):
assert group in operations_output

def test_operation_in_group(self, get_status):
with pytest.raises(ValueError):
get_status(operation=["op1", "group1"])
# Ensure groups with overlapping operations work
get_status(operation=["op1", "group1"])


class TestProjectStatusNoEligibleOperations(TestProjectBase):
Expand Down

0 comments on commit f8db823

Please sign in to comment.