From afa482ead53cd4b1b0fa93b042902e69cff07362 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Thu, 18 Jul 2024 12:22:00 -0700 Subject: [PATCH 1/4] Add aggregate function for quantum provenance graph --- .../pipe/base/quantum_provenance_graph.py | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 58f036e5..279882fc 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -42,7 +42,7 @@ import itertools import logging import uuid -from collections.abc import Iterator, Sequence +from collections.abc import Iterator, Iterable, Sequence from enum import Enum from typing import TYPE_CHECKING, ClassVar, Literal, NamedTuple, TypedDict, cast @@ -484,7 +484,22 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo self.n_unknown += 1 case unrecognized_state: raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}") - + + def add_data_id_group(self, other: TaskSummary) -> None: + """Docstring. + """ + self.n_successful += other.n_successful + self.n_blocked += other.n_blocked + self.n_not_attempted += other.n_not_attempted + self.n_expected += other.n_expected + + if self.wonky_quanta: + self.wonky_quanta.append(other.wonky_quanta) + if self.recovered_quanta: + self.recovered_quanta.append(other.recovered_quanta) + if self.failed_quanta: + self.failed_quanta.append(other.failed_quanta) + class CursedDatasetSummary(pydantic.BaseModel): """A summary of all the relevant information on a cursed dataset.""" @@ -625,6 +640,27 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non self.n_predicted_only += 1 case unrecognized_state: raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}") + + def add_data_id_group(self, other: DatasetTypeSummary) -> None: + """Docstring. + """ + if self.producer: + # Guard against empty string + if self.producer != other.producer: + _LOG.warning("Producer for dataset type is not consistent: %r != %r.", self.producer, other.producer) + _LOG.warning("Ignoring %r.", other.producer) + else: + self.producer = other.producer + + self.n_published += other.n_published + self.n_unpublished += other.n_unpublished + self.n_predicted_only += other.n_predicted_only + self.n_expected += other.n_expected + + if self.cursed_datasets: + self.cursed_datasets.append(other.cursed_datasets) + if self.unsuccessful_datasets: + self.unsuccessful_datasets.append(other.unsuccessful_datasets) class Summary(pydantic.BaseModel): @@ -641,6 +677,22 @@ class Summary(pydantic.BaseModel): """Summaries for the datasets. """ + @classmethod + def aggregate(cls, summaries: Iterable[Summary]) -> Summary: + """Combine summaries from disjoint data id groups into an overall + summary of common tasks and datasets. Intended for use when the same + pipeline has been run over all groups. + """ + result = cls() + for summary in summaries: + for label, task_summary in summary.tasks.items(): + result_task_summary = result.tasks.setdefault(label, TaskSummary()) + result_task_summary.add_data_id_group(task_summary) + for dataset_type, dataset_type_summary in summary.datasets.items(): + result_dataset_summary = result.datasets.setdefault(dataset_type, DatasetTypeSummary()) + result_dataset_summary.add_data_id_group(dataset_type_summary) + return result + class QuantumProvenanceGraph: """A set of already-run, merged quantum graphs with provenance From 5b867519ec911cd251c14f58a89aed51b3ce204e Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Wed, 14 Aug 2024 11:41:25 -0700 Subject: [PATCH 2/4] Add tests for QuantumProvenanceGraph.aggregate --- .../pipe/base/quantum_provenance_graph.py | 15 +- tests/test_quantum_provenance_graph.py | 518 +++++++++++++++++- 2 files changed, 521 insertions(+), 12 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 279882fc..d60862ca 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -493,12 +493,9 @@ def add_data_id_group(self, other: TaskSummary) -> None: self.n_not_attempted += other.n_not_attempted self.n_expected += other.n_expected - if self.wonky_quanta: - self.wonky_quanta.append(other.wonky_quanta) - if self.recovered_quanta: - self.recovered_quanta.append(other.recovered_quanta) - if self.failed_quanta: - self.failed_quanta.append(other.failed_quanta) + self.wonky_quanta.extend(other.wonky_quanta) + self.recovered_quanta.extend(other.recovered_quanta) + self.failed_quanta.extend(other.failed_quanta) class CursedDatasetSummary(pydantic.BaseModel): @@ -657,10 +654,8 @@ def add_data_id_group(self, other: DatasetTypeSummary) -> None: self.n_predicted_only += other.n_predicted_only self.n_expected += other.n_expected - if self.cursed_datasets: - self.cursed_datasets.append(other.cursed_datasets) - if self.unsuccessful_datasets: - self.unsuccessful_datasets.append(other.unsuccessful_datasets) + self.cursed_datasets.extend(other.cursed_datasets) + self.unsuccessful_datasets.extend(other.unsuccessful_datasets) class Summary(pydantic.BaseModel): diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index cdb2c2c9..0fcfa8b1 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -31,9 +31,33 @@ import unittest -from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph +from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary, TaskSummary, DatasetTypeSummary from lsst.pipe.base.tests import simpleQGraph from lsst.utils.tests import temporaryDirectory +import lsst.utils.logging + +expected_mock_datasets = [ + "add_dataset1", + "add2_dataset1", + "task0_metadata", + "task0_log", + "add_dataset2", + "add2_dataset2", + "task1_metadata", + "task1_log", + "add_dataset3", + "add2_dataset3", + "task2_metadata", + "task2_log", + "add_dataset4", + "add2_dataset4", + "task3_metadata", + "task3_log", + "add_dataset5", + "add2_dataset5", + "task4_metadata", + "task4_log", +] class QuantumProvenanceGraphTestCase(unittest.TestCase): @@ -45,7 +69,6 @@ class QuantumProvenanceGraphTestCase(unittest.TestCase): More tests are in lsst/ci_middleware/tests/test_prod_outputs.py and lsst/ci_middleware/tests/test_rc2_outputs.py """ - def test_qpg_reports(self) -> None: """Test that we can add a new graph to the `QuantumProvenanceGraph`. @@ -123,3 +146,494 @@ def test_qpg_reports(self) -> None: self.assertEqual(dataset_type_summary.producer, "task3") case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: self.assertEqual(dataset_type_summary.producer, "task4") + + def test_aggregate_reports(self) -> None: + """Test aggregating reports from the `QuantumProvenanceGraph.` + + More tests are in lsst/ci_middleware/tests/test_prod_outputs.py and + lsst/ci_middleware/tests/test_rc2_outputs.py""" + with temporaryDirectory() as root: + # make a simple qgraph to make an execution report on + butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) + qpg = QuantumProvenanceGraph() + qpg.add_new_graph(butler, qgraph) + qpg.resolve_duplicates(butler) + summary = qpg.to_summary(butler) + # Check that aggregating one summary only does not cause an error + one_graph_only_sum = Summary.aggregate([summary]) + + # Do the same tests as in `test_qpg_reports`, but on the + # 'aggregate' summary. Essentially, verify that the information in + # the report is preserved during the aggregation step. + for task_summary in one_graph_only_sum.tasks.values(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_expected, 1) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in one_graph_only_sum.datasets.items(): + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}] + ) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 1) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + + # Now we test aggregating multiple summaries. First, we try + # aggregating with an exact copy and make sure we just have double + # the numbers. + summary2 = summary.model_copy(deep=True) + two_identical_graph_sum = Summary.aggregate([summary, summary2]) + for task_summary in two_identical_graph_sum.tasks.values(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_expected, 2) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in two_identical_graph_sum.datasets.items(): + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 2) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 2) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + + + # Let's see if we can change lots of counts and info for a task + # which exists in summary 1 and append to the overall summary + # effectively. This summary has a lot of valid variations. + summary3 = summary.model_copy(deep=True) + summary3.tasks["task4"] = summary.tasks["task4"].model_copy( + deep=True, + update={ + "n_successful": 10, + "n_blocked": 20, + "n_not_attempted": 4, + "n_expected": 47, + "failed_quanta": [ + { + "data_id": {"instrument": "INSTR", "detector": 1}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 1", "Second error on detector 1"], + }, + { + "data_id": {"instrument": "INSTR", "detector": 2}, + "runs": {"run1": "failed", "run2": "failed"}, + "messages": [], + }, + { + "data_id": {"instrument": "INSTR", "detector": 3}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 3"], + }, + ], + "recovered_quanta": [ + {"instrument": "INSTR", "detector": 4}, + {"instrument": "INSTR", "detector": 5}, + {"instrument": "INSTR", "detector": 6}, + ], + "wonky_quanta": [ + { + "data_id": {"instrument": "INSTR", "detector": 7}, + "runs": {"run1": "successful", "run2": "failed"}, + "messages": ["This one is wonky because it moved from successful to failed."], + } + ], + "n_wonky": 1, + "n_failed": 3, + }, + ) + summary3.datasets["add_dataset5"] = summary.model_copy( + deep=True, + update={ + "producer": "task4", + "n_published": 0, + "n_unpublished": 0, + "n_predicted_only": 0, + "n_expected": 47, + "cursed_datasets": [{"instrument": "INSTR", "detector": 7}], + "unsuccessful_datasets": [ + {"instrument": "INSTR", "detector": 0}, + {"instrument": "INSTR", "detector": 1}, + {"instrument": "INSTR", "detector": 2}, + {"instrument": "INSTR", "detector": 3}, + ], + }, + ) + summary3.datasets["add2_dataset5"] = summary.model_copy( + deep=True, + update={ + "producer": "task4", + "n_published": 0, + "n_unpublished": 0, + "n_predicted_only": 0, + "n_expected": 47, + "cursed_datasets": [ + {"instrument": "INSTR", "detector": 7}, + ], + "unsuccessful_datasets": [ + {"instrument": "INSTR", "detector": 0}, + {"instrument": "INSTR", "detector": 1}, + {"instrument": "INSTR", "detector": 2}, + {"instrument": "INSTR", "detector": 3}, + ], + }, + ) + # Test that aggregate with this file works + two_graphs_different_numbers = Summary.aggregate([summary, summary3]) + for task_label, task_summary in two_graphs_different_numbers.tasks.items(): + if task_label == "task4": + self.assertEqual(task_summary.n_successful, 10) + self.assertEqual(task_summary.n_blocked, 20) + self.assertEqual(task_summary.n_not_attempted, 5) + self.assertEqual(task_summary.n_expected, 48) + self.assertListEqual(task_summary.failed_quanta, [ + { + "data_id": {"instrument": "INSTR", "detector": 1}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 1", "Second error on detector 1"], + }, + { + "data_id": {"instrument": "INSTR", "detector": 2}, + "runs": {"run1": "failed", "run2": "failed"}, + "messages": [], + }, + { + "data_id": {"instrument": "INSTR", "detector": 3}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 3"], + }, + ]) + self.assertListEqual(task_summary.recovered_quanta, [ + {"instrument": "INSTR", "detector": 4}, + {"instrument": "INSTR", "detector": 5}, + {"instrument": "INSTR", "detector": 6}, + ]) + self.assertListEqual(task_summary.wonky_quanta, + [{ + "data_id": {"instrument": "INSTR", "detector": 7}, + "runs": {"run1": "successful", "run2": "failed"}, + "messages": [ + "This one is wonky because it moved from successful to failed." + ], + }]) + self.assertEqual(task_summary.n_wonky, 1) + self.assertEqual(task_summary.n_failed, 3) + else: + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_expected, 2) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in two_graphs_different_numbers.datasets.items(): + if dataset_type_name in ["add_dataset5", "add2_dataset5"]: + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [ + {"instrument": "INSTR", "detector": 0}, + {"instrument": "INSTR", "detector": 0}, + {"instrument": "INSTR", "detector": 1}, + {"instrument": "INSTR", "detector": 2}, + {"instrument": "INSTR", "detector": 3}, + ], + ) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 48) + self.assertEqual(dataset_type_summary.n_cursed, 1) + self.assertEqual(dataset_type_summary.n_unsuccessful, 5) + self.assertListEqual( + dataset_type_summary.cursed_datasets, [{"instrument": "INSTR", "detector": 7}] + ) + else: + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 2) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 2) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + + # Now, let's add a task to one model and see if aggregate still + # works + summary4 = summary.model_copy(deep=True) + summary4.tasks["task5"] = TaskSummary( + n_successful = 0, + n_blocked = 0, + n_not_attempted = 1, + n_expected = 1, + failed_quanta = [], + recovered_quanta = [], + wonky_quanta = [], + n_wonky = 0, + n_failed = 0 + ) + summary4.datasets["add_dataset6"] = DatasetTypeSummary( + producer = "task5", + n_published = 0, + n_unpublished = 0, + n_predicted_only = 0, + n_expected = 1, + cursed_datasets = [], + unsuccessful_datasets = [{ + "instrument": "INSTR", + "detector": 0 + }], + n_cursed = 0, + n_unsuccessful = 1 + ) + summary4.datasets["task5_log"] = DatasetTypeSummary( + producer = "task5", + n_published = 0, + n_unpublished = 0, + n_predicted_only = 0, + n_expected = 1, + cursed_datasets = [], + unsuccessful_datasets = [{ + "instrument": "INSTR", + "detector": 0 + }], + n_cursed = 0, + n_unsuccessful = 1 + ) + summary4.datasets["task5_metadata"] = DatasetTypeSummary( + producer = "task5", + n_published = 0, + n_unpublished = 0, + n_predicted_only = 0, + n_expected = 1, + cursed_datasets = [], + unsuccessful_datasets = [{ + "instrument": "INSTR", + "detector": 0 + }], + n_cursed = 0, + n_unsuccessful = 1 + ) + two_graphs_extra_task = Summary.aggregate([summary4, summary]) + # Make sure the extra task is in there + self.assertIn("task5", two_graphs_extra_task.tasks) + for task_label, task_summary in two_graphs_extra_task.tasks.items(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + self.assertIn(task_label, ["task0", "task1", "task2", "task3", "task4", "task5"]) + if task_label == "task5": + self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_expected, 1) + else: + self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_expected, 2) + for dataset_type_name, dataset_type_summary in two_graphs_extra_task.datasets.items(): + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + if dataset_type_summary.producer == "task5": + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_unsuccessful, 1) + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}], + ) + else: + self.assertEqual(dataset_type_summary.n_expected, 2) + self.assertEqual(dataset_type_summary.n_unsuccessful, 2) + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertIn(dataset_type_name, expected_mock_datasets+["add_dataset6", "task5_metadata", "task5_log"]) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + case name if name in ["add_dataset6", "task5_metadata", "task5_log"]: + self.assertEqual(dataset_type_summary.producer, "task5") + + # Now we test that we properly catch task-dataset mismatches in + # aggregated graphs. This is a problem because if task 1 produced + # a certain dataset in graph 1, but task 2 produced the same dataset + # in graph 2, the graphs are likely not comparable. + summary5 = summary.model_copy(deep=True) + summary5.datasets["add_dataset3"] = summary.datasets["add_dataset3"].model_copy(deep=True, update={ + "producer": "task0", + "n_published": 0, + "n_unpublished": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [ + { + "instrument": "INSTR", + "detector": 0 + }, + ], + "n_cursed": 0, + "n_unsuccessful": 1 + }) + with self.assertLogs("lsst.pipe.base", level=lsst.utils.logging.VERBOSE) as warning_logs: + should_warn = Summary.aggregate([summary, summary5]) + self.assertIn("WARNING:lsst.pipe.base.quantum_provenance_graph:Producer for dataset type is not consistent: 'task2' != 'task0'.", warning_logs.output[0]) + self.assertIn("WARNING:lsst.pipe.base.quantum_provenance_graph:Ignoring 'task0'.", warning_logs.output[1]) + + # Next up, we're going to try to aggregate summary with a dictionary + # and then with some garbage. Neither of these should work! + with self.assertRaises(TypeError): + Summary.aggregate(summary,{ + "tasks": { + "task0": { + "n_successful": 0, + "n_blocked": 0, + "n_not_attempted": 1, + "n_expected": 1, + "failed_quanta": [], + "recovered_quanta": [], + "wonky_quanta": [], + "n_wonky": 0, + "n_failed": 0 + }, + "datasets": { + "add_dataset1": { + "producer": "task0", + "n_published": 0, + "n_unpublished": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [ + { + "instrument": "INSTR", + "detector": 0 + } + ], + "n_cursed": 0, + "n_unsuccessful": 1 + }, + "add2_dataset1": { + "producer": "task0", + "n_published": 0, + "n_unpublished": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [ + { + "instrument": "INSTR", + "detector": 0 + } + ], + "n_cursed": 0, + "n_unsuccessful": 1 + }, + "task0_metadata": { + "producer": "task0", + "n_published": 0, + "n_unpublished": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [ + { + "instrument": "INSTR", + "detector": 0 + } + ], + "n_cursed": 0, + "n_unsuccessful": 1 + }, + "task0_log": { + "producer": "task0", + "n_published": 0, + "n_unpublished": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [ + { + "instrument": "INSTR", + "detector": 0 + } + ], + "n_cursed": 0, + "n_unsuccessful": 1 + },}}}) + Summary.aggregate([summary, []]) + Summary.aggregate([summary, "some_garbage"]) + + + From 04d4017cf4452ece3e2ba446b821fe3bb5887672 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 16 Aug 2024 13:01:07 -0700 Subject: [PATCH 3/4] Add documentation for DM-41605 --- doc/changes/DM-41605.feature.md | 7 ++++++ .../pipe/base/quantum_provenance_graph.py | 22 +++++++++++++++++-- tests/test_quantum_provenance_graph.py | 9 ++++++-- 3 files changed, 34 insertions(+), 4 deletions(-) create mode 100644 doc/changes/DM-41605.feature.md diff --git a/doc/changes/DM-41605.feature.md b/doc/changes/DM-41605.feature.md new file mode 100644 index 00000000..4ee129dc --- /dev/null +++ b/doc/changes/DM-41605.feature.md @@ -0,0 +1,7 @@ +Add functionality to aggregate multiple `QuantumProvenanceGraph.Summary` +objects into one `Summary` for a wholistic report. + +While the `QuantumProvenanceGraph` was designed to resolve processing over +dataquery-identified groups, `QuantumProvenanceGraph.aggregate` is designed to +combine multiple group-level reports into one which totals the successes, +issues and failures over the same section of pipeline. \ No newline at end of file diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index d60862ca..f6949228 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -486,7 +486,13 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}") def add_data_id_group(self, other: TaskSummary) -> None: - """Docstring. + """Add information from a `TaskSummary` over one dataquery-identified + group to another, as part of aggregating `Summary` reports. + + Parameters + ---------- + other : `TaskSummary` + `TaskSummary` to aggregate. """ self.n_successful += other.n_successful self.n_blocked += other.n_blocked @@ -639,7 +645,14 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}") def add_data_id_group(self, other: DatasetTypeSummary) -> None: - """Docstring. + """Add information from a `DatasetTypeSummary` over one + dataquery-identified group to another, as part of aggregating `Summary` + reports. + + Parameters + ---------- + other : `DatasetTypeSummary` + `DatasetTypeSummary` to aggregate. """ if self.producer: # Guard against empty string @@ -677,6 +690,11 @@ def aggregate(cls, summaries: Iterable[Summary]) -> Summary: """Combine summaries from disjoint data id groups into an overall summary of common tasks and datasets. Intended for use when the same pipeline has been run over all groups. + + Parameters + ---------- + summaries : `Iterable[Summary]` + Iterable of all `Summary` objects to aggregate. """ result = cls() for summary in summaries: diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index 0fcfa8b1..9c8b42c8 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -31,10 +31,15 @@ import unittest -from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary, TaskSummary, DatasetTypeSummary +import lsst.utils.logging +from lsst.pipe.base.quantum_provenance_graph import ( + DatasetTypeSummary, + QuantumProvenanceGraph, + Summary, + TaskSummary, +) from lsst.pipe.base.tests import simpleQGraph from lsst.utils.tests import temporaryDirectory -import lsst.utils.logging expected_mock_datasets = [ "add_dataset1", From 0639af29dd8e8da27140c88235b1d3c74ee935d5 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Mon, 14 Oct 2024 15:50:38 -0700 Subject: [PATCH 4/4] Update variable names and function calls --- .../pipe/base/quantum_provenance_graph.py | 69 +-- tests/test_quantum_provenance_graph.py | 420 ++++++++---------- 2 files changed, 230 insertions(+), 259 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index f6949228..e2674cfa 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -42,7 +42,7 @@ import itertools import logging import uuid -from collections.abc import Iterator, Iterable, Sequence +from collections.abc import Iterator, Sequence from enum import Enum from typing import TYPE_CHECKING, ClassVar, Literal, NamedTuple, TypedDict, cast @@ -484,25 +484,25 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo self.n_unknown += 1 case unrecognized_state: raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}") - - def add_data_id_group(self, other: TaskSummary) -> None: + + def add_data_id_group(self, other_summary: TaskSummary) -> None: """Add information from a `TaskSummary` over one dataquery-identified - group to another, as part of aggregating `Summary` reports. + group to another, as part of aggregating `Summary` reports. Parameters ---------- - other : `TaskSummary` + other_summary : `TaskSummary` `TaskSummary` to aggregate. """ - self.n_successful += other.n_successful - self.n_blocked += other.n_blocked - self.n_not_attempted += other.n_not_attempted - self.n_expected += other.n_expected + self.n_successful += other_summary.n_successful + self.n_blocked += other_summary.n_blocked + self.n_unknown += other_summary.n_unknown + self.n_expected += other_summary.n_expected + + self.wonky_quanta.extend(other_summary.wonky_quanta) + self.recovered_quanta.extend(other_summary.recovered_quanta) + self.failed_quanta.extend(other_summary.failed_quanta) - self.wonky_quanta.extend(other.wonky_quanta) - self.recovered_quanta.extend(other.recovered_quanta) - self.failed_quanta.extend(other.failed_quanta) - class CursedDatasetSummary(pydantic.BaseModel): """A summary of all the relevant information on a cursed dataset.""" @@ -567,7 +567,7 @@ class DatasetTypeSummary(pydantic.BaseModel): runs. """ - producer: str + producer: str = "" """The name of the task which produced this dataset. """ @@ -643,32 +643,37 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non self.n_predicted_only += 1 case unrecognized_state: raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}") - - def add_data_id_group(self, other: DatasetTypeSummary) -> None: + + def add_data_id_group(self, other_summary: DatasetTypeSummary) -> None: """Add information from a `DatasetTypeSummary` over one dataquery-identified group to another, as part of aggregating `Summary` - reports. + reports. Parameters ---------- - other : `DatasetTypeSummary` + other_summary : `DatasetTypeSummary` `DatasetTypeSummary` to aggregate. """ - if self.producer: + if self.producer and other_summary.producer: # Guard against empty string - if self.producer != other.producer: - _LOG.warning("Producer for dataset type is not consistent: %r != %r.", self.producer, other.producer) - _LOG.warning("Ignoring %r.", other.producer) + if self.producer != other_summary.producer: + _LOG.warning( + "Producer for dataset type is not consistent: %r != %r.", + self.producer, + other_summary.producer, + ) + _LOG.warning("Ignoring %r.", other_summary.producer) else: - self.producer = other.producer + if other_summary.producer and not self.producer: + self.producer = other_summary.producer - self.n_published += other.n_published - self.n_unpublished += other.n_unpublished - self.n_predicted_only += other.n_predicted_only - self.n_expected += other.n_expected + self.n_visible += other_summary.n_visible + self.n_shadowed += other_summary.n_shadowed + self.n_predicted_only += other_summary.n_predicted_only + self.n_expected += other_summary.n_expected - self.cursed_datasets.extend(other.cursed_datasets) - self.unsuccessful_datasets.extend(other.unsuccessful_datasets) + self.cursed_datasets.extend(other_summary.cursed_datasets) + self.unsuccessful_datasets.extend(other_summary.unsuccessful_datasets) class Summary(pydantic.BaseModel): @@ -686,15 +691,15 @@ class Summary(pydantic.BaseModel): """ @classmethod - def aggregate(cls, summaries: Iterable[Summary]) -> Summary: + def aggregate(cls, summaries: Sequence[Summary]) -> Summary: """Combine summaries from disjoint data id groups into an overall summary of common tasks and datasets. Intended for use when the same pipeline has been run over all groups. Parameters ---------- - summaries : `Iterable[Summary]` - Iterable of all `Summary` objects to aggregate. + summaries : `Sequence[Summary]` + Sequence of all `Summary` objects to aggregate. """ result = cls() for summary in summaries: diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index 9c8b42c8..d5077812 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -74,6 +74,7 @@ class QuantumProvenanceGraphTestCase(unittest.TestCase): More tests are in lsst/ci_middleware/tests/test_prod_outputs.py and lsst/ci_middleware/tests/test_rc2_outputs.py """ + def test_qpg_reports(self) -> None: """Test that we can add a new graph to the `QuantumProvenanceGraph`. @@ -98,28 +99,7 @@ def test_qpg_reports(self) -> None: self.assertListEqual(task_summary.wonky_quanta, []) self.assertEqual(task_summary.n_wonky, 0) self.assertEqual(task_summary.n_failed, 0) - expected_mock_datasets = [ - "add_dataset1", - "add2_dataset1", - "task0_metadata", - "task0_log", - "add_dataset2", - "add2_dataset2", - "task1_metadata", - "task1_log", - "add_dataset3", - "add2_dataset3", - "task2_metadata", - "task2_log", - "add_dataset4", - "add2_dataset4", - "task3_metadata", - "task3_log", - "add_dataset5", - "add2_dataset5", - "task4_metadata", - "task4_log", - ] + for dataset_type_name, dataset_type_summary in summary.datasets.items(): self.assertListEqual( dataset_type_summary.unsuccessful_datasets, @@ -153,16 +133,12 @@ def test_qpg_reports(self) -> None: self.assertEqual(dataset_type_summary.producer, "task4") def test_aggregate_reports(self) -> None: - """Test aggregating reports from the `QuantumProvenanceGraph.` - - More tests are in lsst/ci_middleware/tests/test_prod_outputs.py and - lsst/ci_middleware/tests/test_rc2_outputs.py""" + """Test aggregating reports from the `QuantumProvenanceGraph.`""" with temporaryDirectory() as root: # make a simple qgraph to make an execution report on butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) qpg = QuantumProvenanceGraph() - qpg.add_new_graph(butler, qgraph) - qpg.resolve_duplicates(butler) + qpg.assemble_quantum_provenance_graph(butler, [qgraph]) summary = qpg.to_summary(butler) # Check that aggregating one summary only does not cause an error one_graph_only_sum = Summary.aggregate([summary]) @@ -173,7 +149,7 @@ def test_aggregate_reports(self) -> None: for task_summary in one_graph_only_sum.tasks.values(): self.assertEqual(task_summary.n_successful, 0) self.assertEqual(task_summary.n_blocked, 0) - self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_unknown, 1) self.assertEqual(task_summary.n_expected, 1) self.assertListEqual(task_summary.failed_quanta, []) self.assertListEqual(task_summary.recovered_quanta, []) @@ -184,8 +160,8 @@ def test_aggregate_reports(self) -> None: self.assertListEqual( dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}] ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 1) self.assertEqual(dataset_type_summary.n_cursed, 0) @@ -212,7 +188,7 @@ def test_aggregate_reports(self) -> None: for task_summary in two_identical_graph_sum.tasks.values(): self.assertEqual(task_summary.n_successful, 0) self.assertEqual(task_summary.n_blocked, 0) - self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_unknown, 2) self.assertEqual(task_summary.n_expected, 2) self.assertListEqual(task_summary.failed_quanta, []) self.assertListEqual(task_summary.recovered_quanta, []) @@ -224,8 +200,8 @@ def test_aggregate_reports(self) -> None: dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 2) self.assertEqual(dataset_type_summary.n_cursed, 0) @@ -244,7 +220,6 @@ def test_aggregate_reports(self) -> None: case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: self.assertEqual(dataset_type_summary.producer, "task4") - # Let's see if we can change lots of counts and info for a task # which exists in summary 1 and append to the overall summary # effectively. This summary has a lot of valid variations. @@ -254,7 +229,7 @@ def test_aggregate_reports(self) -> None: update={ "n_successful": 10, "n_blocked": 20, - "n_not_attempted": 4, + "n_unknown": 4, "n_expected": 47, "failed_quanta": [ { @@ -293,8 +268,8 @@ def test_aggregate_reports(self) -> None: deep=True, update={ "producer": "task4", - "n_published": 0, - "n_unpublished": 0, + "n_visible": 0, + "n_shadowed": 0, "n_predicted_only": 0, "n_expected": 47, "cursed_datasets": [{"instrument": "INSTR", "detector": 7}], @@ -310,8 +285,8 @@ def test_aggregate_reports(self) -> None: deep=True, update={ "producer": "task4", - "n_published": 0, - "n_unpublished": 0, + "n_visible": 0, + "n_shadowed": 0, "n_predicted_only": 0, "n_expected": 47, "cursed_datasets": [ @@ -331,44 +306,52 @@ def test_aggregate_reports(self) -> None: if task_label == "task4": self.assertEqual(task_summary.n_successful, 10) self.assertEqual(task_summary.n_blocked, 20) - self.assertEqual(task_summary.n_not_attempted, 5) + self.assertEqual(task_summary.n_unknown, 5) self.assertEqual(task_summary.n_expected, 48) - self.assertListEqual(task_summary.failed_quanta, [ - { - "data_id": {"instrument": "INSTR", "detector": 1}, - "runs": {"run1": "failed"}, - "messages": ["Error on detector 1", "Second error on detector 1"], - }, - { - "data_id": {"instrument": "INSTR", "detector": 2}, - "runs": {"run1": "failed", "run2": "failed"}, - "messages": [], - }, - { - "data_id": {"instrument": "INSTR", "detector": 3}, - "runs": {"run1": "failed"}, - "messages": ["Error on detector 3"], - }, - ]) - self.assertListEqual(task_summary.recovered_quanta, [ - {"instrument": "INSTR", "detector": 4}, - {"instrument": "INSTR", "detector": 5}, - {"instrument": "INSTR", "detector": 6}, - ]) - self.assertListEqual(task_summary.wonky_quanta, - [{ - "data_id": {"instrument": "INSTR", "detector": 7}, - "runs": {"run1": "successful", "run2": "failed"}, - "messages": [ - "This one is wonky because it moved from successful to failed." - ], - }]) + self.assertListEqual( + task_summary.failed_quanta, + [ + { + "data_id": {"instrument": "INSTR", "detector": 1}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 1", "Second error on detector 1"], + }, + { + "data_id": {"instrument": "INSTR", "detector": 2}, + "runs": {"run1": "failed", "run2": "failed"}, + "messages": [], + }, + { + "data_id": {"instrument": "INSTR", "detector": 3}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 3"], + }, + ], + ) + self.assertListEqual( + task_summary.recovered_quanta, + [ + {"instrument": "INSTR", "detector": 4}, + {"instrument": "INSTR", "detector": 5}, + {"instrument": "INSTR", "detector": 6}, + ], + ) + self.assertListEqual( + task_summary.wonky_quanta, + [ + { + "data_id": {"instrument": "INSTR", "detector": 7}, + "runs": {"run1": "successful", "run2": "failed"}, + "messages": ["This one is wonky because it moved from successful to failed."], + } + ], + ) self.assertEqual(task_summary.n_wonky, 1) self.assertEqual(task_summary.n_failed, 3) else: self.assertEqual(task_summary.n_successful, 0) self.assertEqual(task_summary.n_blocked, 0) - self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_unknown, 2) self.assertEqual(task_summary.n_expected, 2) self.assertListEqual(task_summary.failed_quanta, []) self.assertListEqual(task_summary.recovered_quanta, []) @@ -387,8 +370,8 @@ def test_aggregate_reports(self) -> None: {"instrument": "INSTR", "detector": 3}, ], ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 48) self.assertEqual(dataset_type_summary.n_cursed, 1) @@ -401,8 +384,8 @@ def test_aggregate_reports(self) -> None: dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 2) self.assertEqual(dataset_type_summary.n_cursed, 0) @@ -425,57 +408,48 @@ def test_aggregate_reports(self) -> None: # works summary4 = summary.model_copy(deep=True) summary4.tasks["task5"] = TaskSummary( - n_successful = 0, - n_blocked = 0, - n_not_attempted = 1, - n_expected = 1, - failed_quanta = [], - recovered_quanta = [], - wonky_quanta = [], - n_wonky = 0, - n_failed = 0 + n_successful=0, + n_blocked=0, + n_unknown=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, ) summary4.datasets["add_dataset6"] = DatasetTypeSummary( - producer = "task5", - n_published = 0, - n_unpublished = 0, - n_predicted_only = 0, - n_expected = 1, - cursed_datasets = [], - unsuccessful_datasets = [{ - "instrument": "INSTR", - "detector": 0 - }], - n_cursed = 0, - n_unsuccessful = 1 + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, ) summary4.datasets["task5_log"] = DatasetTypeSummary( - producer = "task5", - n_published = 0, - n_unpublished = 0, - n_predicted_only = 0, - n_expected = 1, - cursed_datasets = [], - unsuccessful_datasets = [{ - "instrument": "INSTR", - "detector": 0 - }], - n_cursed = 0, - n_unsuccessful = 1 + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, ) summary4.datasets["task5_metadata"] = DatasetTypeSummary( - producer = "task5", - n_published = 0, - n_unpublished = 0, - n_predicted_only = 0, - n_expected = 1, - cursed_datasets = [], - unsuccessful_datasets = [{ - "instrument": "INSTR", - "detector": 0 - }], - n_cursed = 0, - n_unsuccessful = 1 + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, ) two_graphs_extra_task = Summary.aggregate([summary4, summary]) # Make sure the extra task is in there @@ -490,14 +464,14 @@ def test_aggregate_reports(self) -> None: self.assertEqual(task_summary.n_failed, 0) self.assertIn(task_label, ["task0", "task1", "task2", "task3", "task4", "task5"]) if task_label == "task5": - self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_unknown, 1) self.assertEqual(task_summary.n_expected, 1) else: - self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_unknown, 2) self.assertEqual(task_summary.n_expected, 2) for dataset_type_name, dataset_type_summary in two_graphs_extra_task.datasets.items(): - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_cursed, 0) self.assertListEqual(dataset_type_summary.cursed_datasets, []) @@ -505,17 +479,20 @@ def test_aggregate_reports(self) -> None: self.assertEqual(dataset_type_summary.n_expected, 1) self.assertEqual(dataset_type_summary.n_unsuccessful, 1) self.assertListEqual( - dataset_type_summary.unsuccessful_datasets, - [{"instrument": "INSTR", "detector": 0}], - ) + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}], + ) else: self.assertEqual(dataset_type_summary.n_expected, 2) self.assertEqual(dataset_type_summary.n_unsuccessful, 2) self.assertListEqual( - dataset_type_summary.unsuccessful_datasets, - [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertIn( + dataset_type_name, + expected_mock_datasets + ["add_dataset6", "task5_metadata", "task5_log"], ) - self.assertIn(dataset_type_name, expected_mock_datasets+["add_dataset6", "task5_metadata", "task5_log"]) match dataset_type_name: case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: self.assertEqual(dataset_type_summary.producer, "task0") @@ -535,110 +512,99 @@ def test_aggregate_reports(self) -> None: # a certain dataset in graph 1, but task 2 produced the same dataset # in graph 2, the graphs are likely not comparable. summary5 = summary.model_copy(deep=True) - summary5.datasets["add_dataset3"] = summary.datasets["add_dataset3"].model_copy(deep=True, update={ - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - }, - ], - "n_cursed": 0, - "n_unsuccessful": 1 - }) + summary5.datasets["add_dataset3"] = summary.datasets["add_dataset3"].model_copy( + deep=True, + update={ + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [ + {"instrument": "INSTR", "detector": 0}, + ], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + ) with self.assertLogs("lsst.pipe.base", level=lsst.utils.logging.VERBOSE) as warning_logs: - should_warn = Summary.aggregate([summary, summary5]) - self.assertIn("WARNING:lsst.pipe.base.quantum_provenance_graph:Producer for dataset type is not consistent: 'task2' != 'task0'.", warning_logs.output[0]) - self.assertIn("WARNING:lsst.pipe.base.quantum_provenance_graph:Ignoring 'task0'.", warning_logs.output[1]) + Summary.aggregate([summary, summary5]) + self.assertIn( + "WARNING:lsst.pipe.base.quantum_provenance_graph:Producer for dataset type is not consistent" + ": 'task2' != 'task0'.", + warning_logs.output[0], + ) + self.assertIn( + "WARNING:lsst.pipe.base.quantum_provenance_graph:Ignoring 'task0'.", warning_logs.output[1] + ) # Next up, we're going to try to aggregate summary with a dictionary # and then with some garbage. Neither of these should work! with self.assertRaises(TypeError): - Summary.aggregate(summary,{ + Summary.aggregate( + summary, + { "tasks": { "task0": { - "n_successful": 0, - "n_blocked": 0, - "n_not_attempted": 1, - "n_expected": 1, - "failed_quanta": [], - "recovered_quanta": [], - "wonky_quanta": [], - "n_wonky": 0, - "n_failed": 0 - }, - "datasets": { - "add_dataset1": { - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - } - ], - "n_cursed": 0, - "n_unsuccessful": 1 + "n_successful": 0, + "n_blocked": 0, + "n_unknown": 1, + "n_expected": 1, + "failed_quanta": [], + "recovered_quanta": [], + "wonky_quanta": [], + "n_wonky": 0, + "n_failed": 0, }, - "add2_dataset1": { - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - } - ], - "n_cursed": 0, - "n_unsuccessful": 1 + "datasets": { + "add_dataset1": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "add2_dataset1": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "task0_metadata": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "task0_log": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, }, - "task0_metadata": { - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - } - ], - "n_cursed": 0, - "n_unsuccessful": 1 - }, - "task0_log": { - "producer": "task0", - "n_published": 0, - "n_unpublished": 0, - "n_predicted_only": 0, - "n_expected": 1, - "cursed_datasets": [], - "unsuccessful_datasets": [ - { - "instrument": "INSTR", - "detector": 0 - } - ], - "n_cursed": 0, - "n_unsuccessful": 1 - },}}}) + } + }, + ) Summary.aggregate([summary, []]) Summary.aggregate([summary, "some_garbage"]) - - -