Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-41605: Command-line aggregator for pipetask report #437

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/changes/DM-41605.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Add functionality to aggregate multiple `QuantumProvenanceGraph.Summary`
objects into one `Summary` for a wholistic report.

While the `QuantumProvenanceGraph` was designed to resolve processing over
dataquery-identified groups, `QuantumProvenanceGraph.aggregate` is designed to
combine multiple group-level reports into one which totals the successes,
issues and failures over the same section of pipeline.
72 changes: 71 additions & 1 deletion python/lsst/pipe/base/quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,24 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo
case unrecognized_state:
raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}")

def add_data_id_group(self, other_summary: TaskSummary) -> None:
"""Add information from a `TaskSummary` over one dataquery-identified
group to another, as part of aggregating `Summary` reports.

Parameters
----------
other_summary : `TaskSummary`
`TaskSummary` to aggregate.
"""
self.n_successful += other_summary.n_successful
self.n_blocked += other_summary.n_blocked
self.n_unknown += other_summary.n_unknown
self.n_expected += other_summary.n_expected

self.wonky_quanta.extend(other_summary.wonky_quanta)
self.recovered_quanta.extend(other_summary.recovered_quanta)
self.failed_quanta.extend(other_summary.failed_quanta)


class CursedDatasetSummary(pydantic.BaseModel):
"""A summary of all the relevant information on a cursed dataset."""
Expand Down Expand Up @@ -549,7 +567,7 @@ class DatasetTypeSummary(pydantic.BaseModel):
runs.
"""

producer: str
producer: str = ""
"""The name of the task which produced this dataset.
"""

Expand Down Expand Up @@ -626,6 +644,37 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non
case unrecognized_state:
raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}")

def add_data_id_group(self, other_summary: DatasetTypeSummary) -> None:
"""Add information from a `DatasetTypeSummary` over one
dataquery-identified group to another, as part of aggregating `Summary`
reports.

Parameters
----------
other_summary : `DatasetTypeSummary`
`DatasetTypeSummary` to aggregate.
"""
if self.producer and other_summary.producer:
# Guard against empty string
if self.producer != other_summary.producer:
_LOG.warning(
"Producer for dataset type is not consistent: %r != %r.",
self.producer,
other_summary.producer,
)
_LOG.warning("Ignoring %r.", other_summary.producer)
else:
if other_summary.producer and not self.producer:
self.producer = other_summary.producer

self.n_visible += other_summary.n_visible
self.n_shadowed += other_summary.n_shadowed
self.n_predicted_only += other_summary.n_predicted_only
self.n_expected += other_summary.n_expected

self.cursed_datasets.extend(other_summary.cursed_datasets)
self.unsuccessful_datasets.extend(other_summary.unsuccessful_datasets)


class Summary(pydantic.BaseModel):
"""A summary of the contents of the QuantumProvenanceGraph, including
Expand All @@ -641,6 +690,27 @@ class Summary(pydantic.BaseModel):
"""Summaries for the datasets.
"""

@classmethod
def aggregate(cls, summaries: Sequence[Summary]) -> Summary:
"""Combine summaries from disjoint data id groups into an overall
summary of common tasks and datasets. Intended for use when the same
pipeline has been run over all groups.

Parameters
----------
summaries : `Sequence[Summary]`
Sequence of all `Summary` objects to aggregate.
"""
result = cls()
for summary in summaries:
for label, task_summary in summary.tasks.items():
result_task_summary = result.tasks.setdefault(label, TaskSummary())
result_task_summary.add_data_id_group(task_summary)
for dataset_type, dataset_type_summary in summary.datasets.items():
result_dataset_summary = result.datasets.setdefault(dataset_type, DatasetTypeSummary())
result_dataset_summary.add_data_id_group(dataset_type_summary)
return result


class QuantumProvenanceGraph:
"""A set of already-run, merged quantum graphs with provenance
Expand Down
Loading
Loading