Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow-state command back compat. #5237

Merged
merged 8 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes.d/5237.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Back-compat: allow workflow-state xtriggers (and the `cylc workflow-state`
command) to read Cylc 7 databases.
31 changes: 26 additions & 5 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ class CylcWorkflowDBChecker:
],
}

def __init__(self, rund, workflow):
db_path = expand_path(
rund, workflow, "log", CylcWorkflowDAO.DB_FILE_BASE_NAME
)
def __init__(self, rund, workflow, db_path=None):
# (Explicit dp_path arg is to make testing easier).
if db_path is None:
# Infer DB path from workflow name and run dir.
db_path = expand_path(
rund, workflow, "log", CylcWorkflowDAO.DB_FILE_BASE_NAME
)
if not os.path.exists(db_path):
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), db_path)
self.conn = sqlite3.connect(db_path, timeout=10.0)
Expand All @@ -73,7 +76,7 @@ def display_maps(res):
sys.stdout.write((", ").join(row) + "\n")

def get_remote_point_format(self):
"""Query a remote workflow database for a 'cycle point format' entry"""
"""Query a workflow database for a 'cycle point format' entry"""
for row in self.conn.execute(
rf'''
SELECT
Expand All @@ -87,6 +90,24 @@ def get_remote_point_format(self):
):
return row[0]

def get_remote_point_format_compat(self):
"""Query a Cylc 7 suite database for a 'cycle point format' entry.
Back compat for Cylc 8 workflow state triggers targeting Cylc 7 DBs.
"""
for row in self.conn.execute(
rf'''
SELECT
value
FROM
{CylcWorkflowDAO.TABLE_SUITE_PARAMS}
WHERE
key==?
''', # nosec (table name is code constant)
['cycle_point_format']
):
return row[0]

def state_lookup(self, state):
"""allows for multiple states to be searched via a status alias"""
if state in self.STATE_ALIASES:
Expand Down
7 changes: 7 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ class CylcWorkflowDAO:
TABLE_BROADCAST_STATES = "broadcast_states"
TABLE_INHERITANCE = "inheritance"
TABLE_WORKFLOW_PARAMS = "workflow_params"
# BACK COMPAT: suite_params
# This Cylc 7 DB table is needed to allow workflow-state
# xtriggers (and the `cylc workflow-state` command) to
# work with Cylc 7 workflows.
# url: https://github.com/cylc/cylc-flow/issues/5236
# remove at: 8.x
TABLE_SUITE_PARAMS = "suite_params"
TABLE_WORKFLOW_FLOWS = "workflow_flows"
TABLE_WORKFLOW_TEMPLATE_VARS = "workflow_template_vars"
TABLE_TASK_JOBS = "task_jobs"
Expand Down
8 changes: 7 additions & 1 deletion cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@
sys.stderr.write('\n')

if connected and self.args['cycle']:
fmt = self.checker.get_remote_point_format()
try:
fmt = self.checker.get_remote_point_format()
except sqlite3.OperationalError as exc:
try:
fmt = self.checker.get_remote_point_format_compat()
except sqlite3.OperationalError:
raise exc # original error

Check warning on line 124 in cylc/flow/scripts/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scripts/workflow_state.py#L120-L124

Added lines #L120 - L124 were not covered by tests
if fmt:
my_parser = TimePointParser()
my_point = my_parser.parse(self.args['cycle'], dump_format=fmt)
Expand Down
8 changes: 7 additions & 1 deletion cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@
except (OSError, sqlite3.Error):
# Failed to connect to DB; target workflow may not be started.
return (False, None)
fmt = checker.get_remote_point_format()
try:
fmt = checker.get_remote_point_format()
except sqlite3.OperationalError as exc:
try:
fmt = checker.get_remote_point_format_compat()
except sqlite3.OperationalError:
raise exc # original error

Check warning on line 99 in cylc/flow/xtriggers/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/workflow_state.py#L98-L99

Added lines #L98 - L99 were not covered by tests
if fmt:
my_parser = TimePointParser()
point = str(my_parser.parse(point, dump_format=fmt))
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
CylcWorkflowDAO,
WorkflowDatabaseManager,
)
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker


@pytest.fixture
Expand Down Expand Up @@ -116,3 +117,22 @@ def test_check_workflow_db_compat(_setup_db, capsys):

with pytest.raises(ServiceFileError, match='99.99'):
WorkflowDatabaseManager.check_db_compatibility(pri_path)


def test_cylc_7_db_wflow_params_table(_setup_db):
"""Test back-compat needed by workflow state xtrigger for Cylc 7 DBs."""
ptformat = "CCYY"
create = r'CREATE TABLE suite_params(key TEXT, value TEXT)'
insert = (
r'INSERT INTO suite_params VALUES'
rf'("cycle_point_format", "{ptformat}")'
)
db_file_name = _setup_db([create, insert])
checker = CylcWorkflowDBChecker('foo', 'bar', db_path=db_file_name)

with pytest.raises(
sqlite3.OperationalError, match="no such table: workflow_params"
):
checker.get_remote_point_format()

assert checker.get_remote_point_format_compat() == ptformat
48 changes: 47 additions & 1 deletion tests/unit/xtriggers/test_workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import sqlite3
from typing import Callable
from unittest.mock import Mock


from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.xtriggers.workflow_state import workflow_state
from ..conftest import MonkeyMock

Expand All @@ -38,3 +40,47 @@ def test_inferred_run(tmp_run_dir: Callable, monkeymock: MonkeyMock):
_, results = workflow_state(id_, task='precious', point='3000')
mock_db_checker.assert_called_once_with(cylc_run_dir, expected_workflow_id)
assert results['workflow'] == expected_workflow_id


def test_back_compat(tmp_run_dir):
"""Test workflow_state xtrigger backwards compatibility with Cylc 7
database."""
id_ = 'celebrimbor'
c7_run_dir: Path = tmp_run_dir(id_)
(c7_run_dir / WorkflowFiles.FLOW_FILE).rename(
c7_run_dir / WorkflowFiles.SUITE_RC
)
db_file = c7_run_dir / 'log' / 'db'
db_file.parent.mkdir(exist_ok=True)
# Note: cannot use CylcWorkflowDAO here as creating outdated DB
conn = sqlite3.connect(str(db_file))
try:
conn.execute(r"""
CREATE TABLE suite_params(key TEXT, value TEXT, PRIMARY KEY(key));
""")
conn.execute(r"""
CREATE TABLE task_states(
name TEXT, cycle TEXT, time_created TEXT, time_updated TEXT,
submit_num INTEGER, status TEXT, PRIMARY KEY(name, cycle)
);
""")
conn.executemany(
r'INSERT INTO "suite_params" VALUES(?,?);',
[('cylc_version', '7.8.12'),
('cycle_point_format', '%Y'),
('cycle_point_tz', 'Z')]
)
conn.execute(r"""
INSERT INTO "task_states" VALUES(
'mithril','2012','2023-01-30T18:19:15Z','2023-01-30T18:19:15Z',
0,'succeeded'
);
""")
conn.commit()
finally:
conn.close()

satisfied, _ = workflow_state(id_, task='mithril', point='2012')
assert satisfied
satisfied, _ = workflow_state(id_, task='arkenstone', point='2012')
assert not satisfied
Loading