diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index f6949228..15508d9d 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -42,7 +42,7 @@ import itertools import logging import uuid -from collections.abc import Iterator, Iterable, Sequence +from collections.abc import Iterator, Sequence from enum import Enum from typing import TYPE_CHECKING, ClassVar, Literal, NamedTuple, TypedDict, cast @@ -484,10 +484,10 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo self.n_unknown += 1 case unrecognized_state: raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}") - + def add_data_id_group(self, other: TaskSummary) -> None: """Add information from a `TaskSummary` over one dataquery-identified - group to another, as part of aggregating `Summary` reports. + group to another, as part of aggregating `Summary` reports. Parameters ---------- @@ -496,13 +496,13 @@ def add_data_id_group(self, other: TaskSummary) -> None: """ self.n_successful += other.n_successful self.n_blocked += other.n_blocked - self.n_not_attempted += other.n_not_attempted + self.n_unknown += other.n_unknown self.n_expected += other.n_expected self.wonky_quanta.extend(other.wonky_quanta) self.recovered_quanta.extend(other.recovered_quanta) self.failed_quanta.extend(other.failed_quanta) - + class CursedDatasetSummary(pydantic.BaseModel): """A summary of all the relevant information on a cursed dataset.""" @@ -567,7 +567,7 @@ class DatasetTypeSummary(pydantic.BaseModel): runs. """ - producer: str + producer: str = "" """The name of the task which produced this dataset. """ @@ -643,27 +643,30 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non self.n_predicted_only += 1 case unrecognized_state: raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}") - + def add_data_id_group(self, other: DatasetTypeSummary) -> None: """Add information from a `DatasetTypeSummary` over one dataquery-identified group to another, as part of aggregating `Summary` - reports. + reports. Parameters ---------- other : `DatasetTypeSummary` `DatasetTypeSummary` to aggregate. """ - if self.producer: + if self.producer and other.producer: # Guard against empty string if self.producer != other.producer: - _LOG.warning("Producer for dataset type is not consistent: %r != %r.", self.producer, other.producer) + _LOG.warning( + "Producer for dataset type is not consistent: %r != %r.", self.producer, other.producer + ) _LOG.warning("Ignoring %r.", other.producer) else: - self.producer = other.producer + if other.producer and not self.producer: + self.producer = other.producer - self.n_published += other.n_published - self.n_unpublished += other.n_unpublished + self.n_visible += other.n_visible + self.n_shadowed += other.n_shadowed self.n_predicted_only += other.n_predicted_only self.n_expected += other.n_expected @@ -686,15 +689,15 @@ class Summary(pydantic.BaseModel): """ @classmethod - def aggregate(cls, summaries: Iterable[Summary]) -> Summary: + def aggregate(cls, summaries: Sequence[Summary]) -> Summary: """Combine summaries from disjoint data id groups into an overall summary of common tasks and datasets. Intended for use when the same pipeline has been run over all groups. Parameters ---------- - summaries : `Iterable[Summary]` - Iterable of all `Summary` objects to aggregate. + summaries : `Sequence[Summary]` + Sequence of all `Summary` objects to aggregate. """ result = cls() for summary in summaries: diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index 9c8b42c8..d5077812 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -74,6 +74,7 @@ class QuantumProvenanceGraphTestCase(unittest.TestCase): More tests are in lsst/ci_middleware/tests/test_prod_outputs.py and lsst/ci_middleware/tests/test_rc2_outputs.py """ + def test_qpg_reports(self) -> None: """Test that we can add a new graph to the `QuantumProvenanceGraph`. @@ -98,28 +99,7 @@ def test_qpg_reports(self) -> None: self.assertListEqual(task_summary.wonky_quanta, []) self.assertEqual(task_summary.n_wonky, 0) self.assertEqual(task_summary.n_failed, 0) - expected_mock_datasets = [ - "add_dataset1", - "add2_dataset1", - "task0_metadata", - "task0_log", - "add_dataset2", - "add2_dataset2", - "task1_metadata", - "task1_log", - "add_dataset3", - "add2_dataset3", - "task2_metadata", - "task2_log", - "add_dataset4", - "add2_dataset4", - "task3_metadata", - "task3_log", - "add_dataset5", - "add2_dataset5", - "task4_metadata", - "task4_log", - ] + for dataset_type_name, dataset_type_summary in summary.datasets.items(): self.assertListEqual( dataset_type_summary.unsuccessful_datasets, @@ -153,16 +133,12 @@ def test_qpg_reports(self) -> None: self.assertEqual(dataset_type_summary.producer, "task4") def test_aggregate_reports(self) -> None: - """Test aggregating reports from the `QuantumProvenanceGraph.` - - More tests are in lsst/ci_middleware/tests/test_prod_outputs.py and - lsst/ci_middleware/tests/test_rc2_outputs.py""" + """Test aggregating reports from the `QuantumProvenanceGraph.`""" with temporaryDirectory() as root: # make a simple qgraph to make an execution report on butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) qpg = QuantumProvenanceGraph() - qpg.add_new_graph(butler, qgraph) - qpg.resolve_duplicates(butler) + qpg.assemble_quantum_provenance_graph(butler, [qgraph]) summary = qpg.to_summary(butler) # Check that aggregating one summary only does not cause an error one_graph_only_sum = Summary.aggregate([summary]) @@ -173,7 +149,7 @@ def test_aggregate_reports(self) -> None: for task_summary in one_graph_only_sum.tasks.values(): self.assertEqual(task_summary.n_successful, 0) self.assertEqual(task_summary.n_blocked, 0) - self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_unknown, 1) self.assertEqual(task_summary.n_expected, 1) self.assertListEqual(task_summary.failed_quanta, []) self.assertListEqual(task_summary.recovered_quanta, []) @@ -184,8 +160,8 @@ def test_aggregate_reports(self) -> None: self.assertListEqual( dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}] ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 1) self.assertEqual(dataset_type_summary.n_cursed, 0) @@ -212,7 +188,7 @@ def test_aggregate_reports(self) -> None: for task_summary in two_identical_graph_sum.tasks.values(): self.assertEqual(task_summary.n_successful, 0) self.assertEqual(task_summary.n_blocked, 0) - self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_unknown, 2) self.assertEqual(task_summary.n_expected, 2) self.assertListEqual(task_summary.failed_quanta, []) self.assertListEqual(task_summary.recovered_quanta, []) @@ -224,8 +200,8 @@ def test_aggregate_reports(self) -> None: dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 2) self.assertEqual(dataset_type_summary.n_cursed, 0) @@ -244,7 +220,6 @@ def test_aggregate_reports(self) -> None: case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: self.assertEqual(dataset_type_summary.producer, "task4") - # Let's see if we can change lots of counts and info for a task # which exists in summary 1 and append to the overall summary # effectively. This summary has a lot of valid variations. @@ -254,7 +229,7 @@ def test_aggregate_reports(self) -> None: update={ "n_successful": 10, "n_blocked": 20, - "n_not_attempted": 4, + "n_unknown": 4, "n_expected": 47, "failed_quanta": [ { @@ -293,8 +268,8 @@ def test_aggregate_reports(self) -> None: deep=True, update={ "producer": "task4", - "n_published": 0, - "n_unpublished": 0, + "n_visible": 0, + "n_shadowed": 0, "n_predicted_only": 0, "n_expected": 47, "cursed_datasets": [{"instrument": "INSTR", "detector": 7}], @@ -310,8 +285,8 @@ def test_aggregate_reports(self) -> None: deep=True, update={ "producer": "task4", - "n_published": 0, - "n_unpublished": 0, + "n_visible": 0, + "n_shadowed": 0, "n_predicted_only": 0, "n_expected": 47, "cursed_datasets": [ @@ -331,44 +306,52 @@ def test_aggregate_reports(self) -> None: if task_label == "task4": self.assertEqual(task_summary.n_successful, 10) self.assertEqual(task_summary.n_blocked, 20) - self.assertEqual(task_summary.n_not_attempted, 5) + self.assertEqual(task_summary.n_unknown, 5) self.assertEqual(task_summary.n_expected, 48) - self.assertListEqual(task_summary.failed_quanta, [ - { - "data_id": {"instrument": "INSTR", "detector": 1}, - "runs": {"run1": "failed"}, - "messages": ["Error on detector 1", "Second error on detector 1"], - }, - { - "data_id": {"instrument": "INSTR", "detector": 2}, - "runs": {"run1": "failed", "run2": "failed"}, - "messages": [], - }, - { - "data_id": {"instrument": "INSTR", "detector": 3}, - "runs": {"run1": "failed"}, - "messages": ["Error on detector 3"], - }, - ]) - self.assertListEqual(task_summary.recovered_quanta, [ - {"instrument": "INSTR", "detector": 4}, - {"instrument": "INSTR", "detector": 5}, - {"instrument": "INSTR", "detector": 6}, - ]) - self.assertListEqual(task_summary.wonky_quanta, - [{ - "data_id": {"instrument": "INSTR", "detector": 7}, - "runs": {"run1": "successful", "run2": "failed"}, - "messages": [ - "This one is wonky because it moved from successful to failed." - ], - }]) + self.assertListEqual( + task_summary.failed_quanta, + [ + { + "data_id": {"instrument": "INSTR", "detector": 1}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 1", "Second error on detector 1"], + }, + { + "data_id": {"instrument": "INSTR", "detector": 2}, + "runs": {"run1": "failed", "run2": "failed"}, + "messages": [], + }, + { + "data_id": {"instrument": "INSTR", "detector": 3}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 3"], + }, + ], + ) + self.assertListEqual( + task_summary.recovered_quanta, + [ + {"instrument": "INSTR", "detector": 4}, + {"instrument": "INSTR", "detector": 5}, + {"instrument": "INSTR", "detector": 6}, + ], + ) + self.assertListEqual( + task_summary.wonky_quanta, + [ + { + "data_id": {"instrument": "INSTR", "detector": 7}, + "runs": {"run1": "successful", "run2": "failed"}, + "messages": ["This one is wonky because it moved from successful to failed."], + } + ], + ) self.assertEqual(task_summary.n_wonky, 1) self.assertEqual(task_summary.n_failed, 3) else: self.assertEqual(task_summary.n_successful, 0) self.assertEqual(task_summary.n_blocked, 0) - self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_unknown, 2) self.assertEqual(task_summary.n_expected, 2) self.assertListEqual(task_summary.failed_quanta, []) self.assertListEqual(task_summary.recovered_quanta, []) @@ -387,8 +370,8 @@ def test_aggregate_reports(self) -> None: {"instrument": "INSTR", "detector": 3}, ], ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 48) self.assertEqual(dataset_type_summary.n_cursed, 1) @@ -401,8 +384,8 @@ def test_aggregate_reports(self) -> None: dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 2) self.assertEqual(dataset_type_summary.n_cursed, 0) @@ -425,57 +408,48 @@ def test_aggregate_reports(self) -> None: # works summary4 = summary.model_copy(deep=True) summary4.tasks["task5"] = TaskSummary( - n_successful = 0, - n_blocked = 0, - n_not_attempted = 1, - n_expected = 1, - failed_quanta = [], - recovered_quanta = [], - wonky_quanta = [], - n_wonky = 0, - n_failed = 0 + n_successful=0, + n_blocked=0, + n_unknown=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, ) summary4.datasets["add_dataset6"] = DatasetTypeSummary( - producer = "task5", - n_published = 0, - n_unpublished = 0, - n_predicted_only = 0, - n_expected = 1, - cursed_datasets = [], - unsuccessful_datasets = [{ - "instrument": "INSTR", - "detector": 0 - }], - n_cursed = 0, - n_unsuccessful = 1 + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, ) summary4.datasets["task5_log"] = DatasetTypeSummary( - producer = "task5", - n_published = 0, - n_unpublished = 0, - n_predicted_only = 0, - n_expected = 1, - cursed_datasets = [], - unsuccessful_datasets = [{ - "instrument": "INSTR", - "detector": 0 - }], - n_cursed = 0, - n_unsuccessful = 1 + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, ) summary4.datasets["task5_metadata"] = DatasetTypeSummary( - producer = "task5", - n_published = 0, - n_unpublished = 0, - n_predicted_only = 0, - n_expected = 1, - cursed_datasets = [], - unsuccessful_datasets = [{ - "instrument": "INSTR", - "detector": 0 - }], - n_cursed = 0, - n_unsuccessful = 1 + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, ) two_graphs_extra_task = Summary.aggregate([summary4, summary]) # Make sure the extra task is in there @@ -490,14 +464,14 @@ def test_aggregate_reports(self) -> None: self.assertEqual(task_summary.n_failed, 0) self.assertIn(task_label, ["task0", "task1", "task2", "task3", "task4", "task5"]) if task_label == "task5": - self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_unknown, 1) self.assertEqual(task_summary.n_expected, 1) else: - self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_unknown, 2) self.assertEqual(task_summary.n_expected, 2) for dataset_type_name, dataset_type_summary in two_graphs_extra_task.datasets.items(): - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_cursed, 0) self.assertListEqual(dataset_type_summary.cursed_datasets, []) @@ -505,17 +479,20 @@ def test_aggregate_reports(self) -> None: self.assertEqual(dataset_type_summary.n_expected, 1) self.assertEqual(dataset_type_summary.n_unsuccessful, 1) self.assertListEqual( - dataset_type_summary.unsuccessful_datasets, - [{"instrument": "INSTR", "detector": 0}], - ) + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}], + ) else: self.assertEqual(dataset_type_summary.n_expected, 2) self.assertEqual(dataset_type_summary.n_unsuccessful, 2) self.assertListEqual( - dataset_type_summary.unsuccessful_datasets, - [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertIn( + dataset_type_name, + expected_mock_datasets + ["add_dataset6", "task5_metadata", "task5_log"], ) - self.assertIn(dataset_type_name, expected_mock_datasets+["add_dataset6", "task5_metadata", "task5_log"]) match dataset_type_name: case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: self.assertEqual(dataset_type_summary.producer, "task0") @@ -535,110 +512,99 @@ def test_aggregate_reports(self) -> None: # a certain dataset in graph 1, but task 2 produced the same dataset # in graph 2, the graphs are likely not comparable. summary5 = summary.model_copy(deep=True) - summary5.datasets["add_dataset3"] = summary.datasets["add_dataset3"].model_copy(deep=True, update={ - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - }, - ], - "n_cursed": 0, - "n_unsuccessful": 1 - }) + summary5.datasets["add_dataset3"] = summary.datasets["add_dataset3"].model_copy( + deep=True, + update={ + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [ + {"instrument": "INSTR", "detector": 0}, + ], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + ) with self.assertLogs("lsst.pipe.base", level=lsst.utils.logging.VERBOSE) as warning_logs: - should_warn = Summary.aggregate([summary, summary5]) - self.assertIn("WARNING:lsst.pipe.base.quantum_provenance_graph:Producer for dataset type is not consistent: 'task2' != 'task0'.", warning_logs.output[0]) - self.assertIn("WARNING:lsst.pipe.base.quantum_provenance_graph:Ignoring 'task0'.", warning_logs.output[1]) + Summary.aggregate([summary, summary5]) + self.assertIn( + "WARNING:lsst.pipe.base.quantum_provenance_graph:Producer for dataset type is not consistent" + ": 'task2' != 'task0'.", + warning_logs.output[0], + ) + self.assertIn( + "WARNING:lsst.pipe.base.quantum_provenance_graph:Ignoring 'task0'.", warning_logs.output[1] + ) # Next up, we're going to try to aggregate summary with a dictionary # and then with some garbage. Neither of these should work! with self.assertRaises(TypeError): - Summary.aggregate(summary,{ + Summary.aggregate( + summary, + { "tasks": { "task0": { - "n_successful": 0, - "n_blocked": 0, - "n_not_attempted": 1, - "n_expected": 1, - "failed_quanta": [], - "recovered_quanta": [], - "wonky_quanta": [], - "n_wonky": 0, - "n_failed": 0 - }, - "datasets": { - "add_dataset1": { - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - } - ], - "n_cursed": 0, - "n_unsuccessful": 1 + "n_successful": 0, + "n_blocked": 0, + "n_unknown": 1, + "n_expected": 1, + "failed_quanta": [], + "recovered_quanta": [], + "wonky_quanta": [], + "n_wonky": 0, + "n_failed": 0, }, - "add2_dataset1": { - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - } - ], - "n_cursed": 0, - "n_unsuccessful": 1 + "datasets": { + "add_dataset1": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "add2_dataset1": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "task0_metadata": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "task0_log": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, }, - "task0_metadata": { - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - } - ], - "n_cursed": 0, - "n_unsuccessful": 1 - }, - "task0_log": { - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - } - ], - "n_cursed": 0, - "n_unsuccessful": 1 - },}}}) + } + }, + ) Summary.aggregate([summary, []]) Summary.aggregate([summary, "some_garbage"]) - - -