diff --git a/doc/changes/DM-41605.feature.md b/doc/changes/DM-41605.feature.md new file mode 100644 index 000000000..4ee129dce --- /dev/null +++ b/doc/changes/DM-41605.feature.md @@ -0,0 +1,7 @@ +Add functionality to aggregate multiple `QuantumProvenanceGraph.Summary` +objects into one `Summary` for a wholistic report. + +While the `QuantumProvenanceGraph` was designed to resolve processing over +dataquery-identified groups, `QuantumProvenanceGraph.aggregate` is designed to +combine multiple group-level reports into one which totals the successes, +issues and failures over the same section of pipeline. \ No newline at end of file diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 58f036e5b..e2674cfae 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -485,6 +485,24 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo case unrecognized_state: raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}") + def add_data_id_group(self, other_summary: TaskSummary) -> None: + """Add information from a `TaskSummary` over one dataquery-identified + group to another, as part of aggregating `Summary` reports. + + Parameters + ---------- + other_summary : `TaskSummary` + `TaskSummary` to aggregate. + """ + self.n_successful += other_summary.n_successful + self.n_blocked += other_summary.n_blocked + self.n_unknown += other_summary.n_unknown + self.n_expected += other_summary.n_expected + + self.wonky_quanta.extend(other_summary.wonky_quanta) + self.recovered_quanta.extend(other_summary.recovered_quanta) + self.failed_quanta.extend(other_summary.failed_quanta) + class CursedDatasetSummary(pydantic.BaseModel): """A summary of all the relevant information on a cursed dataset.""" @@ -549,7 +567,7 @@ class DatasetTypeSummary(pydantic.BaseModel): runs. """ - producer: str + producer: str = "" """The name of the task which produced this dataset. """ @@ -626,6 +644,37 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non case unrecognized_state: raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}") + def add_data_id_group(self, other_summary: DatasetTypeSummary) -> None: + """Add information from a `DatasetTypeSummary` over one + dataquery-identified group to another, as part of aggregating `Summary` + reports. + + Parameters + ---------- + other_summary : `DatasetTypeSummary` + `DatasetTypeSummary` to aggregate. + """ + if self.producer and other_summary.producer: + # Guard against empty string + if self.producer != other_summary.producer: + _LOG.warning( + "Producer for dataset type is not consistent: %r != %r.", + self.producer, + other_summary.producer, + ) + _LOG.warning("Ignoring %r.", other_summary.producer) + else: + if other_summary.producer and not self.producer: + self.producer = other_summary.producer + + self.n_visible += other_summary.n_visible + self.n_shadowed += other_summary.n_shadowed + self.n_predicted_only += other_summary.n_predicted_only + self.n_expected += other_summary.n_expected + + self.cursed_datasets.extend(other_summary.cursed_datasets) + self.unsuccessful_datasets.extend(other_summary.unsuccessful_datasets) + class Summary(pydantic.BaseModel): """A summary of the contents of the QuantumProvenanceGraph, including @@ -641,6 +690,27 @@ class Summary(pydantic.BaseModel): """Summaries for the datasets. """ + @classmethod + def aggregate(cls, summaries: Sequence[Summary]) -> Summary: + """Combine summaries from disjoint data id groups into an overall + summary of common tasks and datasets. Intended for use when the same + pipeline has been run over all groups. + + Parameters + ---------- + summaries : `Sequence[Summary]` + Sequence of all `Summary` objects to aggregate. + """ + result = cls() + for summary in summaries: + for label, task_summary in summary.tasks.items(): + result_task_summary = result.tasks.setdefault(label, TaskSummary()) + result_task_summary.add_data_id_group(task_summary) + for dataset_type, dataset_type_summary in summary.datasets.items(): + result_dataset_summary = result.datasets.setdefault(dataset_type, DatasetTypeSummary()) + result_dataset_summary.add_data_id_group(dataset_type_summary) + return result + class QuantumProvenanceGraph: """A set of already-run, merged quantum graphs with provenance diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index cdb2c2c91..d50778125 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -31,10 +31,39 @@ import unittest -from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph +import lsst.utils.logging +from lsst.pipe.base.quantum_provenance_graph import ( + DatasetTypeSummary, + QuantumProvenanceGraph, + Summary, + TaskSummary, +) from lsst.pipe.base.tests import simpleQGraph from lsst.utils.tests import temporaryDirectory +expected_mock_datasets = [ + "add_dataset1", + "add2_dataset1", + "task0_metadata", + "task0_log", + "add_dataset2", + "add2_dataset2", + "task1_metadata", + "task1_log", + "add_dataset3", + "add2_dataset3", + "task2_metadata", + "task2_log", + "add_dataset4", + "add2_dataset4", + "task3_metadata", + "task3_log", + "add_dataset5", + "add2_dataset5", + "task4_metadata", + "task4_log", +] + class QuantumProvenanceGraphTestCase(unittest.TestCase): """Test reports from the QuantumProvenanceGraph. @@ -70,28 +99,7 @@ def test_qpg_reports(self) -> None: self.assertListEqual(task_summary.wonky_quanta, []) self.assertEqual(task_summary.n_wonky, 0) self.assertEqual(task_summary.n_failed, 0) - expected_mock_datasets = [ - "add_dataset1", - "add2_dataset1", - "task0_metadata", - "task0_log", - "add_dataset2", - "add2_dataset2", - "task1_metadata", - "task1_log", - "add_dataset3", - "add2_dataset3", - "task2_metadata", - "task2_log", - "add_dataset4", - "add2_dataset4", - "task3_metadata", - "task3_log", - "add_dataset5", - "add2_dataset5", - "task4_metadata", - "task4_log", - ] + for dataset_type_name, dataset_type_summary in summary.datasets.items(): self.assertListEqual( dataset_type_summary.unsuccessful_datasets, @@ -123,3 +131,480 @@ def test_qpg_reports(self) -> None: self.assertEqual(dataset_type_summary.producer, "task3") case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: self.assertEqual(dataset_type_summary.producer, "task4") + + def test_aggregate_reports(self) -> None: + """Test aggregating reports from the `QuantumProvenanceGraph.`""" + with temporaryDirectory() as root: + # make a simple qgraph to make an execution report on + butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) + qpg = QuantumProvenanceGraph() + qpg.assemble_quantum_provenance_graph(butler, [qgraph]) + summary = qpg.to_summary(butler) + # Check that aggregating one summary only does not cause an error + one_graph_only_sum = Summary.aggregate([summary]) + + # Do the same tests as in `test_qpg_reports`, but on the + # 'aggregate' summary. Essentially, verify that the information in + # the report is preserved during the aggregation step. + for task_summary in one_graph_only_sum.tasks.values(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_unknown, 1) + self.assertEqual(task_summary.n_expected, 1) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in one_graph_only_sum.datasets.items(): + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}] + ) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 1) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + + # Now we test aggregating multiple summaries. First, we try + # aggregating with an exact copy and make sure we just have double + # the numbers. + summary2 = summary.model_copy(deep=True) + two_identical_graph_sum = Summary.aggregate([summary, summary2]) + for task_summary in two_identical_graph_sum.tasks.values(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_unknown, 2) + self.assertEqual(task_summary.n_expected, 2) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in two_identical_graph_sum.datasets.items(): + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 2) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 2) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + + # Let's see if we can change lots of counts and info for a task + # which exists in summary 1 and append to the overall summary + # effectively. This summary has a lot of valid variations. + summary3 = summary.model_copy(deep=True) + summary3.tasks["task4"] = summary.tasks["task4"].model_copy( + deep=True, + update={ + "n_successful": 10, + "n_blocked": 20, + "n_unknown": 4, + "n_expected": 47, + "failed_quanta": [ + { + "data_id": {"instrument": "INSTR", "detector": 1}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 1", "Second error on detector 1"], + }, + { + "data_id": {"instrument": "INSTR", "detector": 2}, + "runs": {"run1": "failed", "run2": "failed"}, + "messages": [], + }, + { + "data_id": {"instrument": "INSTR", "detector": 3}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 3"], + }, + ], + "recovered_quanta": [ + {"instrument": "INSTR", "detector": 4}, + {"instrument": "INSTR", "detector": 5}, + {"instrument": "INSTR", "detector": 6}, + ], + "wonky_quanta": [ + { + "data_id": {"instrument": "INSTR", "detector": 7}, + "runs": {"run1": "successful", "run2": "failed"}, + "messages": ["This one is wonky because it moved from successful to failed."], + } + ], + "n_wonky": 1, + "n_failed": 3, + }, + ) + summary3.datasets["add_dataset5"] = summary.model_copy( + deep=True, + update={ + "producer": "task4", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 47, + "cursed_datasets": [{"instrument": "INSTR", "detector": 7}], + "unsuccessful_datasets": [ + {"instrument": "INSTR", "detector": 0}, + {"instrument": "INSTR", "detector": 1}, + {"instrument": "INSTR", "detector": 2}, + {"instrument": "INSTR", "detector": 3}, + ], + }, + ) + summary3.datasets["add2_dataset5"] = summary.model_copy( + deep=True, + update={ + "producer": "task4", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 47, + "cursed_datasets": [ + {"instrument": "INSTR", "detector": 7}, + ], + "unsuccessful_datasets": [ + {"instrument": "INSTR", "detector": 0}, + {"instrument": "INSTR", "detector": 1}, + {"instrument": "INSTR", "detector": 2}, + {"instrument": "INSTR", "detector": 3}, + ], + }, + ) + # Test that aggregate with this file works + two_graphs_different_numbers = Summary.aggregate([summary, summary3]) + for task_label, task_summary in two_graphs_different_numbers.tasks.items(): + if task_label == "task4": + self.assertEqual(task_summary.n_successful, 10) + self.assertEqual(task_summary.n_blocked, 20) + self.assertEqual(task_summary.n_unknown, 5) + self.assertEqual(task_summary.n_expected, 48) + self.assertListEqual( + task_summary.failed_quanta, + [ + { + "data_id": {"instrument": "INSTR", "detector": 1}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 1", "Second error on detector 1"], + }, + { + "data_id": {"instrument": "INSTR", "detector": 2}, + "runs": {"run1": "failed", "run2": "failed"}, + "messages": [], + }, + { + "data_id": {"instrument": "INSTR", "detector": 3}, + "runs": {"run1": "failed"}, + "messages": ["Error on detector 3"], + }, + ], + ) + self.assertListEqual( + task_summary.recovered_quanta, + [ + {"instrument": "INSTR", "detector": 4}, + {"instrument": "INSTR", "detector": 5}, + {"instrument": "INSTR", "detector": 6}, + ], + ) + self.assertListEqual( + task_summary.wonky_quanta, + [ + { + "data_id": {"instrument": "INSTR", "detector": 7}, + "runs": {"run1": "successful", "run2": "failed"}, + "messages": ["This one is wonky because it moved from successful to failed."], + } + ], + ) + self.assertEqual(task_summary.n_wonky, 1) + self.assertEqual(task_summary.n_failed, 3) + else: + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_unknown, 2) + self.assertEqual(task_summary.n_expected, 2) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in two_graphs_different_numbers.datasets.items(): + if dataset_type_name in ["add_dataset5", "add2_dataset5"]: + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [ + {"instrument": "INSTR", "detector": 0}, + {"instrument": "INSTR", "detector": 0}, + {"instrument": "INSTR", "detector": 1}, + {"instrument": "INSTR", "detector": 2}, + {"instrument": "INSTR", "detector": 3}, + ], + ) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 48) + self.assertEqual(dataset_type_summary.n_cursed, 1) + self.assertEqual(dataset_type_summary.n_unsuccessful, 5) + self.assertListEqual( + dataset_type_summary.cursed_datasets, [{"instrument": "INSTR", "detector": 7}] + ) + else: + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 2) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 2) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + + # Now, let's add a task to one model and see if aggregate still + # works + summary4 = summary.model_copy(deep=True) + summary4.tasks["task5"] = TaskSummary( + n_successful=0, + n_blocked=0, + n_unknown=1, + n_expected=1, + failed_quanta=[], + recovered_quanta=[], + wonky_quanta=[], + n_wonky=0, + n_failed=0, + ) + summary4.datasets["add_dataset6"] = DatasetTypeSummary( + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ) + summary4.datasets["task5_log"] = DatasetTypeSummary( + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ) + summary4.datasets["task5_metadata"] = DatasetTypeSummary( + producer="task5", + n_visible=0, + n_shadowed=0, + n_predicted_only=0, + n_expected=1, + cursed_datasets=[], + unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], + n_cursed=0, + n_unsuccessful=1, + ) + two_graphs_extra_task = Summary.aggregate([summary4, summary]) + # Make sure the extra task is in there + self.assertIn("task5", two_graphs_extra_task.tasks) + for task_label, task_summary in two_graphs_extra_task.tasks.items(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + self.assertIn(task_label, ["task0", "task1", "task2", "task3", "task4", "task5"]) + if task_label == "task5": + self.assertEqual(task_summary.n_unknown, 1) + self.assertEqual(task_summary.n_expected, 1) + else: + self.assertEqual(task_summary.n_unknown, 2) + self.assertEqual(task_summary.n_expected, 2) + for dataset_type_name, dataset_type_summary in two_graphs_extra_task.datasets.items(): + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + if dataset_type_summary.producer == "task5": + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_unsuccessful, 1) + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}], + ) + else: + self.assertEqual(dataset_type_summary.n_expected, 2) + self.assertEqual(dataset_type_summary.n_unsuccessful, 2) + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertIn( + dataset_type_name, + expected_mock_datasets + ["add_dataset6", "task5_metadata", "task5_log"], + ) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + case name if name in ["add_dataset6", "task5_metadata", "task5_log"]: + self.assertEqual(dataset_type_summary.producer, "task5") + + # Now we test that we properly catch task-dataset mismatches in + # aggregated graphs. This is a problem because if task 1 produced + # a certain dataset in graph 1, but task 2 produced the same dataset + # in graph 2, the graphs are likely not comparable. + summary5 = summary.model_copy(deep=True) + summary5.datasets["add_dataset3"] = summary.datasets["add_dataset3"].model_copy( + deep=True, + update={ + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [ + {"instrument": "INSTR", "detector": 0}, + ], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + ) + with self.assertLogs("lsst.pipe.base", level=lsst.utils.logging.VERBOSE) as warning_logs: + Summary.aggregate([summary, summary5]) + self.assertIn( + "WARNING:lsst.pipe.base.quantum_provenance_graph:Producer for dataset type is not consistent" + ": 'task2' != 'task0'.", + warning_logs.output[0], + ) + self.assertIn( + "WARNING:lsst.pipe.base.quantum_provenance_graph:Ignoring 'task0'.", warning_logs.output[1] + ) + + # Next up, we're going to try to aggregate summary with a dictionary + # and then with some garbage. Neither of these should work! + with self.assertRaises(TypeError): + Summary.aggregate( + summary, + { + "tasks": { + "task0": { + "n_successful": 0, + "n_blocked": 0, + "n_unknown": 1, + "n_expected": 1, + "failed_quanta": [], + "recovered_quanta": [], + "wonky_quanta": [], + "n_wonky": 0, + "n_failed": 0, + }, + "datasets": { + "add_dataset1": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "add2_dataset1": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "task0_metadata": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + "task0_log": { + "producer": "task0", + "n_visible": 0, + "n_shadowed": 0, + "n_predicted_only": 0, + "n_expected": 1, + "cursed_datasets": [], + "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], + "n_cursed": 0, + "n_unsuccessful": 1, + }, + }, + } + }, + ) + Summary.aggregate([summary, []]) + Summary.aggregate([summary, "some_garbage"])