-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
use
pytest-reportlog
to generate upstream-dev CI failure reports (#…
…6699) * rely on pytest-reportlog to produce the log file * use the new script to generate the report * install the dependencies of the parser script * temporarily enable report generation [test-upstream] * actually upload the log file [skip-ci][test-upstream] * print the created report [skip-ci][test-upstream] * Undo the temporary changes [test-upstream]
- Loading branch information
Showing
2 changed files
with
96 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,57 +1,102 @@ | ||
# type: ignore | ||
import argparse | ||
import itertools | ||
import functools | ||
import json | ||
import pathlib | ||
import textwrap | ||
from dataclasses import dataclass | ||
|
||
parser = argparse.ArgumentParser() | ||
parser.add_argument("filepaths", nargs="+", type=pathlib.Path) | ||
args = parser.parse_args() | ||
from pytest import CollectReport, TestReport | ||
|
||
filepaths = sorted(p for p in args.filepaths if p.is_file()) | ||
|
||
@dataclass | ||
class SessionStart: | ||
pytest_version: str | ||
outcome: str = "status" | ||
|
||
def extract_short_test_summary_info(lines): | ||
up_to_start_of_section = itertools.dropwhile( | ||
lambda l: "=== short test summary info ===" not in l, | ||
lines, | ||
) | ||
up_to_section_content = itertools.islice(up_to_start_of_section, 1, None) | ||
section_content = itertools.takewhile( | ||
lambda l: l.startswith("FAILED") or l.startswith("ERROR"), up_to_section_content | ||
) | ||
content = "\n".join(section_content) | ||
@classmethod | ||
def _from_json(cls, json): | ||
json_ = json.copy() | ||
json_.pop("$report_type") | ||
return cls(**json_) | ||
|
||
return content | ||
|
||
@dataclass | ||
class SessionFinish: | ||
exitstatus: str | ||
outcome: str = "status" | ||
|
||
def format_log_message(path): | ||
py_version = path.name.split("-")[1] | ||
summary = f"Python {py_version} Test Summary Info" | ||
with open(path) as f: | ||
data = extract_short_test_summary_info(line.rstrip() for line in f) | ||
message = ( | ||
textwrap.dedent( | ||
"""\ | ||
<details><summary>{summary}</summary> | ||
@classmethod | ||
def _from_json(cls, json): | ||
json_ = json.copy() | ||
json_.pop("$report_type") | ||
return cls(**json_) | ||
|
||
``` | ||
{data} | ||
``` | ||
|
||
</details> | ||
""" | ||
) | ||
.rstrip() | ||
.format(summary=summary, data=data) | ||
) | ||
def parse_record(record): | ||
report_types = { | ||
"TestReport": TestReport, | ||
"CollectReport": CollectReport, | ||
"SessionStart": SessionStart, | ||
"SessionFinish": SessionFinish, | ||
} | ||
cls = report_types.get(record["$report_type"]) | ||
if cls is None: | ||
raise ValueError(f"unknown report type: {record['$report_type']}") | ||
|
||
return cls._from_json(record) | ||
|
||
|
||
@functools.singledispatch | ||
def format_summary(report): | ||
return f"{report.nodeid}: {report}" | ||
|
||
|
||
@format_summary.register | ||
def _(report: TestReport): | ||
message = report.longrepr.chain[0][1].message | ||
return f"{report.nodeid}: {message}" | ||
|
||
|
||
@format_summary.register | ||
def _(report: CollectReport): | ||
message = report.longrepr.split("\n")[-1].removeprefix("E").lstrip() | ||
return f"{report.nodeid}: {message}" | ||
|
||
|
||
def format_report(reports, py_version): | ||
newline = "\n" | ||
summaries = newline.join(format_summary(r) for r in reports) | ||
message = textwrap.dedent( | ||
"""\ | ||
<details><summary>Python {py_version} Test Summary</summary> | ||
``` | ||
{summaries} | ||
``` | ||
</details> | ||
""" | ||
).format(summaries=summaries, py_version=py_version) | ||
return message | ||
|
||
|
||
print("Parsing logs ...") | ||
message = "\n\n".join(format_log_message(path) for path in filepaths) | ||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument("filepath", type=pathlib.Path) | ||
args = parser.parse_args() | ||
|
||
py_version = args.filepath.stem.split("-")[1] | ||
|
||
print("Parsing logs ...") | ||
|
||
lines = args.filepath.read_text().splitlines() | ||
reports = [parse_record(json.loads(line)) for line in lines] | ||
|
||
failed = [report for report in reports if report.outcome == "failed"] | ||
|
||
message = format_report(failed, py_version=py_version) | ||
|
||
output_file = pathlib.Path("pytest-logs.txt") | ||
print(f"Writing output file to: {output_file.absolute()}") | ||
output_file.write_text(message) | ||
output_file = pathlib.Path("pytest-logs.txt") | ||
print(f"Writing output file to: {output_file.absolute()}") | ||
output_file.write_text(message) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters