From 4bd027393c32d18fb85f92e6af5c6b2207bd4c33 Mon Sep 17 00:00:00 2001 From: Boyen van Gorp Date: Fri, 10 Mar 2023 09:49:28 +0100 Subject: [PATCH] PEP8 standard --- sigridci/sigridci.py | 564 +++++++++++++++++++++---------------------- 1 file changed, 281 insertions(+), 283 deletions(-) diff --git a/sigridci/sigridci.py b/sigridci/sigridci.py index 4ccd8884..0b47b63e 100755 --- a/sigridci/sigridci.py +++ b/sigridci/sigridci.py @@ -15,9 +15,8 @@ # limitations under the License. import argparse -import base64 -import datetime import dataclasses +import datetime import html import json import os @@ -33,7 +32,6 @@ from dataclasses import dataclass from xml.dom import minidom - LOG_HISTORY = [] @@ -45,91 +43,91 @@ def log(message): @dataclass class UploadOptions: - sourceDir: str = None - excludePatterns: typing.List[str] = dataclasses.field(default_factory=lambda: []) - includeHistory: bool = False + source_dir: str = None + exclude_patterns: typing.List[str] = dataclasses.field(default_factory=lambda: []) + include_history: bool = False subsystem: str = "" - showContents: bool = False - publishOnly: bool = False + show_contents: bool = False + publish_only: bool = False - def readScopeFile(self): - return self.locateFile(["sigrid.yaml", "sigrid.yml"]) + def read_scope_file(self): + return self.locate_file(["sigrid.yaml", "sigrid.yml"]) - def readMetadataFile(self): - return self.locateFile(["sigrid-metadata.yaml", "sigrid-metadata.yml"]) + def read_metadata_file(self): + return self.locate_file(["sigrid-metadata.yaml", "sigrid-metadata.yml"]) - def locateFile(self, possibleFileNames): - for file in possibleFileNames: - if os.path.exists(f"{self.sourceDir}/{file}"): - with open(f"{self.sourceDir}/{file}", "r") as f: + def locate_file(self, possible_file_names): + for file in possible_file_names: + if os.path.exists(f"{self.source_dir}/{file}"): + with open(f"{self.source_dir}/{file}", "r") as f: return f.read() return None class TargetQuality: - def __init__(self, scope, targetRating): - self.ratings = {"MAINTAINABILITY" : targetRating} + def __init__(self, scope, target_rating): + self.ratings = {"MAINTAINABILITY" : target_rating} # We can't use pyyaml because PIP is not available in the some of the # very diverse set of customer environments where Sigrid CI is used. - targetPattern = re.compile("(" + "|".join(Report.METRICS) + "):\s*([\d\.]+)", re.IGNORECASE) + target_pattern = re.compile("(" + "|".join(Report.METRICS) + "):\s*([\d\.]+)", re.IGNORECASE) for line in scope.split("\n"): - match = targetPattern.match(line.strip()) + match = target_pattern.match(line.strip()) if match: log(f"Loading {match.group(1).upper()} target from scope configuration file") self.ratings[match.group(1).upper()] = float(match.group(2)) - def meetsTargetQualityForMetric(self, feedback, metric): + def meets_target_quality_for_metric(self, feedback, metric): value = feedback["newCodeRatings"].get(metric, None) - targetRating = self.ratings.get(metric, None) - return value == None or targetRating == None or value >= targetRating + target_rating = self.ratings.get(metric, None) + return value == None or target_rating == None or value >= target_rating - def meetsQualityTargets(self, feedback): - return all(self.meetsTargetQualityForMetric(feedback, metric) for metric in self.ratings) + def meets_quality_targets(self, feedback): + return all(self.meets_target_quality_for_metric(feedback, metric) for metric in self.ratings) -class SigridApiClient: +class Sigridapi_client: API_VERSION = "v1" POLL_INTERVAL = 30 POLL_ATTEMPTS = 120 def __init__(self, args): - self.baseURL = args.sigridurl - self.urlPartnerName = urllib.parse.quote_plus(args.partner.lower()) - self.urlCustomerName = urllib.parse.quote_plus(args.customer.lower()) - self.urlSystemName = urllib.parse.quote_plus(args.system.lower()) + self.base_url = args.sigridurl + self.url_partner_name = urllib.parse.quote_plus(args.partner.lower()) + self.url_customer_name = urllib.parse.quote_plus(args.customer.lower()) + self.url_system_name = urllib.parse.quote_plus(args.system.lower()) self.publish = args.publish or args.publishonly self.subsystem = args.subsystem - def callSigridAPI(self, path, body=None, contentType=None): - url = f"{self.baseURL}/rest/{path}" + def call_sigrid_api(self, path, body=None, content_type=None): + url = f"{self.base_url}/rest/{path}" request = urllib.request.Request(url, body) request.add_header("Accept", "application/json") request.add_header("Authorization", f"Bearer {os.environ['SIGRID_CI_TOKEN']}".encode("utf8")) - if contentType != None: - request.add_header("Content-Type", contentType) + if content_type != None: + request.add_header("Content-Type", content_type) response = urllib.request.urlopen(request) if response.status == 204: return {} - responseBody = response.read().decode("utf8") - if len(responseBody) == 0: + response_body = response.read().decode("utf8") + if len(response_body) == 0: log("Received empty response") return {} - return json.loads(responseBody) + return json.loads(response_body) - def retry(self, operation, *, attempts=5, allow404=False, allowEmpty=True): + def retry(self, operation, *, attempts=5, allow_404=False, allow_empty=True): for attempt in range(attempts): try: response = operation() - if allowEmpty or response != {}: + if allow_empty or response != {}: return response except urllib.error.HTTPError as e: if e.code in [401, 403]: log("You are not authorized to access Sigrid for this system") sys.exit(1) - elif allow404 and e.code == 404: + elif allow_404 and e.code == 404: return False # These statements are intentionally outside of the except-block, @@ -140,75 +138,75 @@ def retry(self, operation, *, attempts=5, allow404=False, allowEmpty=True): log(f"Sigrid is currently unavailable, failed after {attempts} attempts") sys.exit(1) - def submitUpload(self, options, systemExists): + def submit_upload(self, options, system_exists): with tempfile.TemporaryDirectory() as tempDir: log("Creating upload") - uploadPacker = SystemUploadPacker(options) + upload_packer = SystemUploadPacker(options) upload = os.path.join(tempDir, "upload.zip") - uploadPacker.prepareUpload(options.sourceDir, upload) + upload_packer.prepare_upload(options.source_dir, upload) log("Preparing upload") - uploadLocation = self.obtainUploadLocation(systemExists) - uploadUrl = uploadLocation["uploadUrl"] - analysisId = uploadLocation["ciRunId"] - log(f"Sigrid CI analysis ID: {analysisId}") + upload_location = self.obtain_upload_location(system_exists) + upload_url = upload_location["upload_url"] + analysis_id = upload_location["ciRunId"] + log(f"Sigrid CI analysis ID: {analysis_id}") log("Publishing upload" if self.publish else "Submitting upload") - self.uploadBinaryFile(uploadUrl, upload) + self.upload_binary_file(upload_url, upload) - return analysisId + return analysis_id - def obtainUploadLocation(self, systemExists): - path = f"/inboundresults/{self.urlPartnerName}/{self.urlCustomerName}/{self.urlSystemName}/ci/uploads/{self.API_VERSION}" - if not systemExists: + def obtain_upload_location(self, system_exists): + path = f"/inboundresults/{self.url_partner_name}/{self.url_customer_name}/{self.url_system_name}/ci/uploads/{self.API_VERSION}" + if not system_exists: path += "/onboarding" elif self.publish: path += "/publish" if self.subsystem: path += "?subsystem=" + urllib.parse.quote_plus(self.subsystem) - return self.retry(lambda: self.callSigridAPI(path)) + return self.retry(lambda: self.call_sigrid_api(path)) - def validateScopeFile(self, scopeFile): - path = f"/inboundresults/{self.urlPartnerName}/{self.urlCustomerName}/{self.urlSystemName}/ci/validate/{self.API_VERSION}" - return self.retry(lambda: self.callSigridAPI(path, scopeFile.encode("utf8"), "application/yaml")) + def validate_scope_file(self, scopeFile): + path = f"/inboundresults/{self.url_partner_name}/{self.url_customer_name}/{self.url_system_name}/ci/validate/{self.API_VERSION}" + return self.retry(lambda: self.call_sigrid_api(path, scopeFile.encode("utf8"), "application/yaml")) - def validateMetadata(self, metadataFile): - path = f"/analysis-results/sigridci/{self.urlCustomerName}/validate" - return self.retry(lambda: self.callSigridAPI(path, metadataFile.encode("utf8"), "application/yaml")) + def validate_metadata(self, metadataFile): + path = f"/analysis-results/sigridci/{self.url_customer_name}/validate" + return self.retry(lambda: self.call_sigrid_api(path, metadataFile.encode("utf8"), "application/yaml")) - def uploadBinaryFile(self, url, upload): - self.retry(lambda: self.attemptUpload(url, upload)) + def upload_binary_file(self, url, upload): + self.retry(lambda: self.attempt_upload(url, upload)) log(f"Upload successful") - def attemptUpload(self, url, upload): + def attempt_upload(self, url, upload): with open(upload, "rb") as uploadRef: - uploadRequest = urllib.request.Request(url, data=uploadRef) - uploadRequest.method = "PUT" - uploadRequest.add_header("Content-Type", "application/zip") - uploadRequest.add_header("Content-Length", "%d" % os.path.getsize(upload)) - uploadRequest.add_header("x-amz-server-side-encryption", "AES256") - urllib.request.urlopen(uploadRequest) - - def checkSystemExists(self): - path = f"/analysis-results/sigridci/{self.urlCustomerName}/{self.urlSystemName}/{self.API_VERSION}/ci" - return self.retry(lambda: self.callSigridAPI(path), allow404=True) != False - - def fetchAnalysisResults(self, analysisId): + upload_request = urllib.request.Request(url, data=uploadRef) + upload_request.method = "PUT" + upload_request.add_header("Content-Type", "application/zip") + upload_request.add_header("Content-Length", "%d" % os.path.getsize(upload)) + upload_request.add_header("x-amz-server-side-encryption", "AES256") + urllib.request.urlopen(upload_request) + + def check_system_exists(self): + path = f"/analysis-results/sigridci/{self.url_customer_name}/{self.url_system_name}/{self.API_VERSION}/ci" + return self.retry(lambda: self.call_sigrid_api(path), allow_404=True) != False + + def fetch_analysis_results(self, analysis_id): log("Waiting for analysis results") - path = f"/analysis-results/sigridci/{self.urlCustomerName}/{self.urlSystemName}/{self.API_VERSION}/ci/results/{analysisId}" - return self.retry(lambda: self.callSigridAPI(path), attempts=self.POLL_ATTEMPTS, allowEmpty=False) + path = f"/analysis-results/sigridci/{self.url_customer_name}/{self.url_system_name}/{self.API_VERSION}/ci/results/{analysis_id}" + return self.retry(lambda: self.call_sigrid_api(path), attempts=self.POLL_ATTEMPTS, allow_empty=False) - def fetchMetadata(self): - path = f"/analysis-results/api/{self.API_VERSION}/system-metadata/{self.urlCustomerName}/{self.urlSystemName}" - return self.retry(lambda: self.callSigridAPI(path)) + def fetch_metadata(self): + path = f"/analysis-results/api/{self.API_VERSION}/system-metadata/{self.url_customer_name}/{self.url_system_name}" + return self.retry(lambda: self.call_sigrid_api(path)) - def fetchObjectives(self): - path = f"/analysis-results/api/{self.API_VERSION}/objectives/{self.urlCustomerName}/{self.urlSystemName}/config" - return self.retry(lambda: self.callSigridAPI(path)) + def fetch_objectives(self): + path = f"/analysis-results/api/{self.API_VERSION}/objectives/{self.url_customer_name}/{self.url_system_name}/config" + return self.retry(lambda: self.call_sigrid_api(path)) - def getLandingPage(self, analysisId, target): - targetRating = "%.1f" % target.ratings["MAINTAINABILITY"] - return f"{self.baseURL}/{self.urlCustomerName}/{self.urlSystemName}/-/sigrid-ci/{analysisId}?targetRating={targetRating}" + def get_landing_page(self, analysis_id, target): + target_rating = "%.1f" % target.ratings["MAINTAINABILITY"] + return f"{self.base_url}/{self.url_customer_name}/{self.url_system_name}/-/sigrid-ci/{analysis_id}?target_rating={target_rating}" class SystemUploadPacker: @@ -234,55 +232,55 @@ class SystemUploadPacker: def __init__(self, options): self.options = options - def prepareUpload(self, sourceDir, outputFile): - zipFile = zipfile.ZipFile(outputFile, "w", zipfile.ZIP_DEFLATED) - hasContents = False + def prepare_upload(self, source_dir, output_file): + zip_file = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED) + has_contents = False - if self.options.includeHistory and os.path.exists(f"{sourceDir}/.git"): - self.includeRepositoryHistory(sourceDir) + if self.options.include_history and os.path.exists(f"{source_dir}/.git"): + self.include_repository_history(source_dir) - for root, dirs, files in os.walk(sourceDir): + for root, dirs, files in os.walk(source_dir): for file in sorted(files): - filePath = os.path.join(root, file) - if file != outputFile and not self.isExcluded(filePath): - relativePath = os.path.relpath(os.path.join(root, file), sourceDir) - hasContents = True - if self.options.showContents: - log(f"Adding file to upload: {relativePath}") - zipFile.write(filePath, relativePath) + file_path = os.path.join(root, file) + if file != output_file and not self.is_excluded(file_path): + relative_path = os.path.relpath(os.path.join(root, file), source_dir) + has_contents = True + if self.options.show_contents: + log(f"Adding file to upload: {relative_path}") + zip_file.write(file_path, relative_path) - zipFile.close() + zip_file.close() - self.checkUploadContents(outputFile, hasContents) + self.check_upload_contents(output_file, has_contents) - def checkUploadContents(self, outputFile, hasContents): - uploadSizeBytes = os.path.getsize(outputFile) - uploadSizeMB = max(round(uploadSizeBytes / 1024 / 1024), 1) - log(f"Upload size is {uploadSizeMB} MB") + def check_upload_contents(self, output_file, has_contents): + upload_size_bytes = os.path.getsize(output_file) + upload_size_mb = max(round(upload_size_bytes / 1024 / 1024), 1) + log(f"Upload size is {upload_size_mb} MB") - if uploadSizeMB > self.MAX_UPLOAD_SIZE_MB: + if upload_size_mb > self.MAX_UPLOAD_SIZE_MB: raise Exception(f"Upload exceeds maximum size of {self.MAX_UPLOAD_SIZE_MB} MB") - elif not hasContents: + elif not has_contents: print(f"No code found to upload, please check the directory used for --source") sys.exit(1) - elif uploadSizeBytes < 50000: + elif upload_size_bytes < 50000: log("Warning: Upload is very small, source directory might not contain all source code") - def isExcluded(self, filePath): - excludePatterns = self.DEFAULT_EXCLUDES + (self.options.excludePatterns or []) - normalizedPath = filePath.replace("\\", "/") - for exclude in excludePatterns: - if exclude != "" and exclude.strip() in normalizedPath: + def is_excluded(self, file_path): + exclude_patterns = self.DEFAULT_EXCLUDES + (self.options.exclude_patterns or []) + normalized_path = file_path.replace("\\", "/") + for exclude in exclude_patterns: + if exclude != "" and exclude.strip() in normalized_path: return True return False - def includeRepositoryHistory(self, sourceDir): - gitCommand = ["git", "-C", sourceDir, "--no-pager", "log", "--date=iso", "--format='@@@;%H;%an;%ae;%ad;%s'", \ + def include_repository_history(self, source_dir): + git_command = ["git", "-C", source_dir, "--no-pager", "log", "--date=iso", "--format='@@@;%H;%an;%ae;%ad;%s'", \ "--numstat", "--no-merges"] try: - output = subprocess.run(gitCommand, stdout=subprocess.PIPE) + output = subprocess.run(git_command, stdout=subprocess.PIPE) if output.returncode == 0: - with open(f"{sourceDir}/git.log", "w") as f: + with open(f"{source_dir}/git.log", "w") as f: f.write(output.stdout.decode("utf8")) else: log("Exporting repository history failed") @@ -297,31 +295,31 @@ class Report: REFACTORING_CANDIDATE_METRICS = ["DUPLICATION", "UNIT_SIZE", "UNIT_COMPLEXITY", "UNIT_INTERFACING", "MODULE_COUPLING"] - def generate(self, analysisId, feedback, args, target): + def generate(self, analysis_id, feedback, args, target): pass - def formatMetricName(self, metric): + def format_metric_name(self, metric): return metric.replace("_PROP", "").title().replace("_", " ") - def formatRating(self, ratings, metric, naText="N/A"): + def format_rating(self, ratings, metric, naText="N/A"): if ratings.get(metric, None) == None: return naText return "%.1f" % ratings[metric] - def formatBaseline(self, feedback): + def format_baseline(self, feedback): if not feedback.get("baseline", None): return "N/A" - snapshotDate = datetime.datetime.strptime(feedback["baseline"], "%Y%m%d") - return snapshotDate.strftime("%Y-%m-%d") + snapshot_date = datetime.datetime.strptime(feedback["baseline"], "%Y%m%d") + return snapshot_date.strftime("%Y-%m-%d") - def getSigridUrl(self, args): + def get_sigrid_url(self, args): customer = urllib.parse.quote_plus(args.customer.lower()) system = urllib.parse.quote_plus(args.system.lower()) return f"https://sigrid-says.com/{customer}/{system}" - def getRefactoringCandidates(self, feedback, metric): - refactoringCandidates = feedback.get("refactoringCandidates", []) - return [rc for rc in refactoringCandidates if rc["metric"] == metric or metric == "MAINTAINABILITY"] + def get_refactoring_candidates(self, feedback, metric): + refactoring_candidates = feedback.get("refactoringCandidates", []) + return [rc for rc in refactoring_candidates if rc["metric"] == metric or metric == "MAINTAINABILITY"] class TextReport(Report): @@ -335,223 +333,223 @@ class TextReport(Report): def __init__(self, output=sys.stdout): self.output = output - def generate(self, analysisId, feedback, args, target): - self.printHeader("Refactoring candidates") + def generate(self, analysis_id, feedback, args, target): + self.print_header("Refactoring candidates") for metric in self.REFACTORING_CANDIDATE_METRICS: - self.printMetric(feedback, metric) + self.print_metric(feedback, metric) - self.printHeader("Maintainability ratings") - self.printTableRow(["System property", f"Baseline on {self.formatBaseline(feedback)}", \ - "New/changed code", "Target", "Overall" if args.publish else ""] ) + self.print_header("Maintainability ratings") + self.print_table_row(["System property", f"Baseline on {self.format_baseline(feedback)}", \ + "New/changed code", "Target", "Overall" if args.publish else ""]) for metric in self.METRICS: if metric == "MAINTAINABILITY": - self.printSeparator() + self.print_separator() row = [ - self.formatMetricName(metric), - "(" + self.formatRating(feedback["baselineRatings"], metric) + ")", - self.formatRating(feedback["newCodeRatings"], metric), + self.format_metric_name(metric), + "(" + self.format_rating(feedback["baselineRatings"], metric) + ")", + self.format_rating(feedback["newCodeRatings"], metric), str(target.ratings.get(metric, "")), - self.formatRating(feedback["baselineRatings"], metric) if args.publish else "" + self.format_rating(feedback["baselineRatings"], metric) if args.publish else "" ] - self.printTableRow(row, self.getRatingColor(feedback, target, metric)) + self.print_table_row(row, self.get_rating_color(feedback, target, metric)) - def printTableRow(self, row, color=None): - formattedRow = "%-27s%-25s%-20s%-10s%-7s" % tuple(row) + def print_table_row(self, row, color=None): + formatted_row = "%-27s%-25s%-20s%-10s%-7s" % tuple(row) if color: - self.printColor(formattedRow, color) + self.print_color(formatted_row, color) else: - print(formattedRow, file=self.output) + print(formatted_row, file=self.output) - def printHeader(self, header): + def print_header(self, header): print("", file=self.output) - self.printSeparator() + self.print_separator() print(header, file=self.output) - self.printSeparator() + self.print_separator() - def printSeparator(self): + def print_separator(self): print("-" * self.LINE_WIDTH, file=self.output) - def printMetric(self, feedback, metric): + def print_metric(self, feedback, metric): print("", file=self.output) - print(self.formatMetricName(metric), file=self.output) + print(self.format_metric_name(metric), file=self.output) - refactoringCandidates = self.getRefactoringCandidates(feedback, metric) - if len(refactoringCandidates) == 0: + refactoring_candidates = self.get_refactoring_candidates(feedback, metric) + if len(refactoring_candidates) == 0: print(" None", file=self.output) else: - for rc in refactoringCandidates: - print(self.formatRefactoringCandidate(rc), file=self.output) + for rc in refactoring_candidates: + print(self.format_refactoring_candidate(rc), file=self.output) - def getRatingColor(self, feedback, target, metric): + def get_rating_color(self, feedback, target, metric): if feedback["newCodeRatings"].get(metric, None) == None or not metric in target.ratings: return self.ANSI_BLUE - elif target.meetsTargetQualityForMetric(feedback, metric): + elif target.meets_target_quality_for_metric(feedback, metric): return self.ANSI_GREEN else: return self.ANSI_RED - def formatRefactoringCandidate(self, rc): + def format_refactoring_candidate(self, rc): category = ("(" + rc["category"] + ")").ljust(14) subject = rc["subject"].replace("\n", "\n" + (" " * 21)).replace("::", "\n" + (" " * 21)) return f" - {category} {subject}" - def printColor(self, message, ansiPrefix): - print(ansiPrefix + message + "\033[0m", file=self.output) + def print_color(self, message, ansi_prefix): + print(ansi_prefix + message + "\033[0m", file=self.output) class StaticHtmlReport(Report): HTML_STAR_FULL = "★" HTML_STAR_EMPTY = "☆" - def generate(self, analysisId, feedback, args, target): + def generate(self, analysis_id, feedback, args, target): with open(os.path.dirname(__file__) + "/sigridci-feedback-template.html", encoding="utf-8", mode="r") as templateRef: template = templateRef.read() - template = self.renderHtmlFeedback(template, feedback, args, target) + template = self.render_html_feedback(template, feedback, args, target) - reportFile = os.path.abspath("sigrid-ci-output/index.html") - writer = open(reportFile, encoding="utf-8", mode="w") + report_file = os.path.abspath("sigrid-ci-output/index.html") + writer = open(report_file, encoding="utf-8", mode="w") writer.write(template) writer.close() print("") print("You can find the full results here:") - print(" " + reportFile) + print(" " + report_file) print("") print("You can find more information about these results in Sigrid:") - print(" " + self.getSigridUrl(args)) + print(" " + self.get_sigrid_url(args)) print("") - def renderHtmlFeedback(self, template, feedback, args, target): + def render_html_feedback(self, template, feedback, args, target): placeholders = { "CUSTOMER" : html.escape(args.customer), "SYSTEM" : html.escape(args.system), "TARGET" : "%.1f" % target.ratings["MAINTAINABILITY"], "LINES_OF_CODE_TOUCHED" : "%d" % feedback.get("newCodeLinesOfCode", 0), - "BASELINE_DATE" : self.formatBaseline(feedback), - "SIGRID_LINK" : self.getSigridUrl(args), - "MAINTAINABILITY_PASSED" : ("passed" if target.meetsQualityTargets(feedback) else "failed") + "BASELINE_DATE" : self.format_baseline(feedback), + "SIGRID_LINK" : self.get_sigrid_url(args), + "MAINTAINABILITY_PASSED" : ("passed" if target.meets_quality_targets(feedback) else "failed") } for metric in self.METRICS: - placeholders[f"{metric}_OVERALL"] = self.formatRating(feedback["baselineRatings"], metric) - placeholders[f"{metric}_NEW"] = self.formatRating(feedback["newCodeRatings"], metric) - placeholders[f"{metric}_TARGET"] = self.formatRating(target.ratings, metric, "") - placeholders[f"{metric}_STARS_OVERALL"] = self.formatHtmlStars(feedback["baselineRatings"], metric) - placeholders[f"{metric}_STARS_NEW"] = self.formatHtmlStars(feedback["newCodeRatings"], metric) - placeholders[f"{metric}_PASSED"] = self.formatPassed(feedback, target, metric) - placeholders[f"{metric}_REFACTORING_CANDIDATES"] = self.formatRefactoringCandidates(feedback, metric) + placeholders[f"{metric}_OVERALL"] = self.format_rating(feedback["baselineRatings"], metric) + placeholders[f"{metric}_NEW"] = self.format_rating(feedback["newCodeRatings"], metric) + placeholders[f"{metric}_TARGET"] = self.format_rating(target.ratings, metric, "") + placeholders[f"{metric}_STARS_OVERALL"] = self.format_html_stars(feedback["baselineRatings"], metric) + placeholders[f"{metric}_STARS_NEW"] = self.format_html_stars(feedback["newCodeRatings"], metric) + placeholders[f"{metric}_PASSED"] = self.format_passed(feedback, target, metric) + placeholders[f"{metric}_REFACTORING_CANDIDATES"] = self.format_refactoring_candidates(feedback, metric) - return self.fillPlaceholders(template, placeholders) + return self.fill_placeholders(template, placeholders) - def fillPlaceholders(self, template, placeholders): + def fill_placeholders(self, template, placeholders): for placeholder, value in placeholders.items(): template = template.replace(f"@@@{placeholder}", value) return template - def formatPassed(self, feedback, target, metric): + def format_passed(self, feedback, target, metric): if target.ratings.get(metric, None) == None: return "" - return "passed" if target.meetsTargetQualityForMetric(feedback, metric) else "failed" + return "passed" if target.meets_target_quality_for_metric(feedback, metric) else "failed" - def formatRefactoringCandidates(self, feedback, metric): - refactoringCandidates = self.getRefactoringCandidates(feedback, metric) - if len(refactoringCandidates) == 0: + def format_refactoring_candidates(self, feedback, metric): + refactoring_candidates = self.get_refactoring_candidates(feedback, metric) + if len(refactoring_candidates) == 0: return "None" - return "\n".join([self.formatRefactoringCandidate(rc) for rc in refactoringCandidates]) + return "\n".join([self.format_refactoring_candidate(rc) for rc in refactoring_candidates]) - def formatRefactoringCandidate(self, rc): - subjectName = html.escape(rc["subject"]).replace("\n", "
").replace("::", "
") + def format_refactoring_candidate(self, rc): + subject_name = html.escape(rc["subject"]).replace("\n", "
").replace("::", "
") category = html.escape(rc["category"]) - return f"({category})
{subjectName}
" + return f"({category})
{subject_name}
" - def formatHtmlStars(self, ratings, metric): + def format_html_stars(self, ratings, metric): if ratings.get(metric, None) == None: return "N/A" stars = min(int(ratings[metric] + 0.5), 5) - fullStars = stars * self.HTML_STAR_FULL - emptyStars = (5 - stars) * self.HTML_STAR_EMPTY - rating = self.formatRating(ratings, metric) - return f"{fullStars}{emptyStars}   " + rating + full_stars = stars * self.HTML_STAR_FULL + empty_stars = (5 - stars) * self.HTML_STAR_EMPTY + rating = self.format_rating(ratings, metric) + return f"{full_stars}{empty_stars}   " + rating class JUnitFormatReport(Report): - def generate(self, analysisId, feedback, args, target): + def generate(self, analysis_id, feedback, args, target): with open("sigrid-ci-output/sigridci-junit-format-report.xml", "w") as fileRef: - fileRef.write(self.generateXML(feedback, target)) + fileRef.write(self.generate_xml(feedback, target)) - def generateXML(self, feedback, target): + def generate_xml(self, feedback, target): dom = minidom.Document() - testSuite = dom.createElement("testsuite") - testSuite.setAttribute("name", "Sigrid CI") - dom.appendChild(testSuite) + test_suite = dom.createElement("testsuite") + test_suite.setAttribute("name", "Sigrid CI") + dom.appendChild(test_suite) - testCase = dom.createElement("testcase") - testCase.setAttribute("classname", "Sigrid CI") - testCase.setAttribute("name", "Maintainability") - testSuite.appendChild(testCase) + test_case = dom.createElement("testcase") + test_case.setAttribute("classname", "Sigrid CI") + test_case.setAttribute("name", "Maintainability") + test_suite.appendChild(test_case) - failures = self.getFailures(feedback, target) + failures = self.get_failures(feedback, target) if len(failures) > 0: failure = dom.createElement("failure") failure.appendChild(dom.createTextNode("Refactoring candidates:\n\n" + "\n".join(failures))) - testCase.appendChild(failure) + test_case.appendChild(failure) return dom.toprettyxml(indent=" ") - def getFailures(self, feedback, target): - if target.meetsQualityTargets(feedback): + def get_failures(self, feedback, target): + if target.meets_quality_targets(feedback): return [] - formatFailure = lambda rc: f"- {rc['subject']}\n ({self.formatMetricName(rc['metric'])}, {rc['category']})" + format_failure = lambda rc: f"- {rc['subject']}\n ({self.format_metric_name(rc['metric'])}, {rc['category']})" - candidates = self.getRefactoringCandidates(feedback, "MAINTAINABILITY")\ - if not target.meetsTargetQualityForMetric(feedback, "MAINTAINABILITY")\ - else self.getFailedRcs(feedback, target) - return [formatFailure(rc) for rc in candidates] + candidates = self.get_refactoring_candidates(feedback, "MAINTAINABILITY")\ + if not target.meets_target_quality_for_metric(feedback, "MAINTAINABILITY")\ + else self.get_failed_rcs(feedback, target) + return [format_failure(rc) for rc in candidates] - def getFailedRcs(self, feedback, target): - lists = [self.getRefactoringCandidates(feedback, m) for m in target.ratings - if not target.meetsTargetQualityForMetric(feedback, m)] + def get_failed_rcs(self, feedback, target): + lists = [self.get_refactoring_candidates(feedback, m) for m in target.ratings + if not target.meets_target_quality_for_metric(feedback, m)] return [item for sublist in lists for item in sublist] class ConclusionReport(Report): - def __init__(self, apiClient, output=sys.stdout): - self.apiClient = apiClient + def __init__(self, api_client, output=sys.stdout): + self.api_client = api_client self.output = output - def generate(self, analysisId, feedback, args, target): - self.printConclusionMessage(feedback, target) - self.printLandingPage(analysisId, feedback, target) + def generate(self, analysis_id, feedback, args, target): + self.print_conclusion_message(feedback, target) + self.print_landing_page(analysis_id, feedback, target) # If you publish(only) we never break the build # We can break the build when running on a branch or pull request. - if not target.meetsQualityTargets(feedback) and not args.publish: + if not target.meets_quality_targets(feedback) and not args.publish: sys.exit(1) - def printConclusionMessage(self, feedback, target): - asciiArt = TextReport(self.output) + def print_conclusion_message(self, feedback, target): + ascii_art = TextReport(self.output) if feedback["newCodeRatings"].get("MAINTAINABILITY", None) == None: - asciiArt.printColor("\n** SIGRID CI RUN COMPLETE: NO FILES CONSIDERED FOR MAINTAINABILITY WERE CHANGED **\n", \ - asciiArt.ANSI_BOLD + asciiArt.ANSI_BLUE) - elif target.meetsQualityTargets(feedback): - asciiArt.printColor("\n** SIGRID CI RUN COMPLETE: YOU WROTE MAINTAINABLE CODE AND REACHED THE TARGET **\n", \ - asciiArt.ANSI_BOLD + asciiArt.ANSI_GREEN) + ascii_art.print_color("\n** SIGRID CI RUN COMPLETE: NO FILES CONSIDERED FOR MAINTAINABILITY WERE CHANGED **\n", \ + ascii_art.ANSI_BOLD + ascii_art.ANSI_BLUE) + elif target.meets_quality_targets(feedback): + ascii_art.print_color("\n** SIGRID CI RUN COMPLETE: YOU WROTE MAINTAINABLE CODE AND REACHED THE TARGET **\n", \ + ascii_art.ANSI_BOLD + ascii_art.ANSI_GREEN) else: - asciiArt.printColor("\n** SIGRID CI RUN COMPLETE: THE CODE YOU WROTE DID NOT MEET THE TARGET FOR MAINTAINABLE CODE **\n", \ - asciiArt.ANSI_BOLD + asciiArt.ANSI_YELLOW) + ascii_art.print_color("\n** SIGRID CI RUN COMPLETE: THE CODE YOU WROTE DID NOT MEET THE TARGET FOR MAINTAINABLE CODE **\n", \ + ascii_art.ANSI_BOLD + ascii_art.ANSI_YELLOW) - def printLandingPage(self, analysisId, feedback, target): - landingPage = self.apiClient.getLandingPage(analysisId, target) + def print_landing_page(self, analysis_id, feedback, target): + landing_page = self.api_client.get_landing_page(analysis_id, target) print("", file=self.output) - print("-" * (len(landingPage) + 4), file=self.output) + print("-" * (len(landing_page) + 4), file=self.output) print("View your analysis results in Sigrid:", file=self.output) - print(f" {landingPage}", file=self.output) - print("-" * (len(landingPage) + 4), file=self.output) + print(f" {landing_page}", file=self.output) + print("-" * (len(landing_page) + 4), file=self.output) print("", file=self.output) @@ -574,87 +572,87 @@ class SigridCiRunner: "remark" ] - def loadSigridTarget(self, apiClient): - objectives = apiClient.fetchObjectives() - targetRating = objectives.get("NEW_CODE_QUALITY", objectives.get("MAINTAINABILITY", 3.5)) - log("Using Sigrid for target rating (%.1f stars)" % targetRating) - return targetRating - - def run(self, apiClient, options, target, reports): - if os.path.exists(f"{options.sourceDir}/sigrid.yml"): + def load_sigrid_target(self, api_client): + objectives = api_client.fetch_objectives() + target_rating = objectives.get("NEW_CODE_QUALITY", objectives.get("MAINTAINABILITY", 3.5)) + log("Using Sigrid for target rating (%.1f stars)" % target_rating) + return target_rating + + def run(self, api_client, options, target, reports): + if os.path.exists(f"{options.source_dir}/sigrid.yml"): log("Found sigrid.yml in repository. Did you mean sigrid.yaml?") sys.exit(1) - systemExists = apiClient.checkSystemExists() - log("Found system in Sigrid" if systemExists else "System is not yet on-boarded to Sigrid") + system_exists = api_client.check_system_exists() + log("Found system in Sigrid" if system_exists else "System is not yet on-boarded to Sigrid") - self.prepareMetadata(options) - self.validateConfigurationFiles(apiClient, options) - analysisId = apiClient.submitUpload(options, systemExists) + self.prepare_metadata(options) + self.validate_configuration_files(api_client, options) + analysis_id = api_client.submit_upload(options, system_exists) - if not systemExists: - log(f"System '{apiClient.urlSystemName}' is on-boarded to Sigrid, and will appear in sigrid-says.com shortly") - elif options.publishOnly: + if not system_exists: + log(f"System '{api_client.url_system_name}' is on-boarded to Sigrid, and will appear in sigrid-says.com shortly") + elif options.publish_only: log("Your project's source code has been published to Sigrid") - self.displayMetadata(apiClient, options) + self.display_metadata(api_client, options) else: - feedback = apiClient.fetchAnalysisResults(analysisId) - self.displayMetadata(apiClient, options) + feedback = api_client.fetch_analysis_results(analysis_id) + self.display_metadata(api_client, options) if not os.path.exists("sigrid-ci-output"): os.mkdir("sigrid-ci-output") for report in reports: - report.generate(analysisId, feedback, args, target) + report.generate(analysis_id, feedback, args, target) - def validateConfigurationFiles(self, apiClient, options): - scope = options.readScopeFile() + def validate_configuration_files(self, api_client, options): + scope = options.read_scope_file() if scope: - self.validateConfiguration(lambda: apiClient.validateScopeFile(scope), "scope configuration file") + self.validateConfiguration(lambda: api_client.validate_scope_file(scope), "scope configuration file") - metadataFile = options.readMetadataFile() + metadataFile = options.read_metadata_file() if metadataFile: - self.validateConfiguration(lambda: apiClient.validateMetadata(metadataFile), "Sigrid metadata file") + self.validateConfiguration(lambda: api_client.validate_metadata(metadataFile), "Sigrid metadata file") - def validateConfiguration(self, validationCall, configurationName): - log(f"Validating {configurationName}") - validationResult = validationCall() - if validationResult["valid"]: + def validateConfiguration(self, validation_call, configuration_name): + log(f"Validating {configuration_name}") + validation_result = validation_call() + if validation_result["valid"]: log("Validation passed") else: log("-" * 80) - log(f"Invalid {configurationName}:") - for note in validationResult["notes"]: + log(f"Invalid {configuration_name}:") + for note in validation_result["notes"]: log(f" - {note}") log("-" * 80) sys.exit(1) - def displayMetadata(self, apiClient, options): - if options.readMetadataFile() == None: + def display_metadata(self, api_client, options): + if options.read_metadata_file() == None: print("") print("Sigrid metadata for this system:") - for key, value in apiClient.fetchMetadata().items(): + for key, value in api_client.fetch_metadata().items(): if value: print(f" {key}:".ljust(20) + str(value)) - def prepareMetadata(self, options): - getMetadataValue = lambda field: os.environ.get(field.lower(), "") - metadata = {field: getMetadataValue(field) for field in self.METADATA_FIELDS if getMetadataValue(field)} + def prepare_metadata(self, options): + get_metadata_value = lambda field: os.environ.get(field.lower(), "") + metadata = {field: get_metadata_value(field) for field in self.METADATA_FIELDS if get_metadata_value(field)} if len(metadata) > 0: - if options.readMetadataFile() != None: + if options.read_metadata_file() != None: raise Exception("Cannot add metadata using environment variables if metadata YAML file is already used") - with open(f"{options.sourceDir}/sigrid-metadata.yaml", "w") as writer: + with open(f"{options.source_dir}/sigrid-metadata.yaml", "w") as writer: writer.write("metadata:\n") for name, value in metadata.items(): - formattedValue = f"[\"{value}\"]" if name in ["teamNames", "supplierNames"] else f"\"{value}\"" - writer.write(f" {name}: {formattedValue}\n") + formatted_value = f"[\"{value}\"]" if name in ["teamNames", "supplierNames"] else f"\"{value}\"" + writer.write(f" {name}: {formatted_value}\n") - def isValidSystemName(self, customerName, systemName): - return self.SYSTEM_NAME_PATTERN.match(systemName) and \ - len(systemName) >= self.SYSTEM_NAME_LENGTH.start and \ - (len(systemName) + len(customerName) + 1) in self.SYSTEM_NAME_LENGTH + def is_valid_system_name(self, customer_name, system_name): + return self.SYSTEM_NAME_PATTERN.match(system_name) and \ + len(system_name) >= self.SYSTEM_NAME_LENGTH.start and \ + (len(system_name) + len(customer_name) + 1) in self.SYSTEM_NAME_LENGTH if __name__ == "__main__": @@ -695,17 +693,17 @@ def isValidSystemName(self, customerName, systemName): log("Starting Sigrid CI") options = UploadOptions(args.source, args.exclude.split(","), args.include_history, args.subsystem, args.showupload, args.publishonly) - apiClient = SigridApiClient(args) - reports = [TextReport(), StaticHtmlReport(), JUnitFormatReport(), ConclusionReport(apiClient)] + api_client = Sigridapi_client(args) + reports = [TextReport(), StaticHtmlReport(), JUnitFormatReport(), ConclusionReport(api_client)] runner = SigridCiRunner() - if not runner.isValidSystemName(args.customer, args.system): + if not runner.is_valid_system_name(args.customer, args.system): maxNameLength = runner.SYSTEM_NAME_LENGTH.stop - (len(args.customer) + 1) print(f"Invalid system name, system name should match '{runner.SYSTEM_NAME_PATTERN.pattern}' " f"and be {runner.SYSTEM_NAME_LENGTH.start} to {maxNameLength} characters long (inclusive).") sys.exit(1) - targetRating = runner.loadSigridTarget(apiClient) if args.targetquality == "sigrid" else float(args.targetquality) - target = TargetQuality(options.readScopeFile() or "", targetRating) - runner.run(apiClient, options, target, reports) + target_rating = runner.load_sigrid_target(api_client) if args.targetquality == "sigrid" else float(args.targetquality) + target = TargetQuality(options.read_scope_file() or "", target_rating) + runner.run(api_client, options, target, reports)