Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update errors field in JSON Report #108

Merged
merged 8 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions modelscan/error.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
from typing import Optional
from enum import Enum


class ErrorCategories(Enum):
MODEL_SCAN = 1
DEPENDENCY = 2
PATH = 3
NESTED_ZIP = 4
PICKLE_GENOPS = 5
MAGIC_NUMBER = 6
JSON_DECODE = 7


class Error:
scan_name: str
category: ErrorCategories
message: Optional[str]
source: Optional[str]

def __init__(self) -> None:
pass

Expand All @@ -10,12 +26,17 @@ def __str__(self) -> str:


class ModelScanError(Error):
scan_name: str
message: Optional[str]

def __init__(self, scan_name: str, message: Optional[str] = None) -> None:
def __init__(
self,
scan_name: str,
category: ErrorCategories,
message: Optional[str] = None,
source: Optional[str] = None,
) -> None:
self.scan_name = scan_name
self.category = category
self.message = message or "None"
self.source = str(source)

def __str__(self) -> str:
return f"The following error was raised during a {self.scan_name} scan: \n{self.message}"
121 changes: 97 additions & 24 deletions modelscan/modelscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from modelscan.settings import DEFAULT_SETTINGS

from pathlib import Path
from typing import List, Union, Optional, IO, Dict, Tuple, Any
from typing import List, Union, Optional, IO, Dict, Any
from datetime import datetime

from modelscan.error import Error, ModelScanError
from modelscan.error import ModelScanError, ErrorCategories
from modelscan.skip import ModelScanSkipped, SkipCategories
from modelscan.issues import Issues, IssueSeverity
from modelscan.scanners.scan import ScanBase
from modelscan.tools.utils import _is_zipfile
Expand All @@ -24,9 +25,9 @@ def __init__(
) -> None:
# Output
self._issues = Issues()
self._errors: List[Error] = []
self._init_errors: List[Error] = []
self._skipped: List[str] = []
self._errors: List[ModelScanError] = []
self._init_errors: List[ModelScanError] = []
self._skipped: List[ModelScanSkipped] = []
self._scanned: List[str] = []
self._input_path: str = ""

Expand Down Expand Up @@ -54,7 +55,9 @@ def _load_scanners(self) -> None:
logger.error(f"Error importing scanner {scanner_path}")
self._init_errors.append(
ModelScanError(
scanner_path, f"Error importing scanner {scanner_path}: {e}"
scanner_path,
ErrorCategories.MODEL_SCAN,
f"Error importing scanner: {e}",
)
)

Expand Down Expand Up @@ -86,13 +89,25 @@ def _scan_path(
):
self._scan_zip(path)
elif not scanned:
self._skipped.append(str(path))
# check if added to skipped already
willarmiros marked this conversation as resolved.
Show resolved Hide resolved
all_skipped_files = [skipped.source for skipped in self._skipped]
if str(path) not in all_skipped_files:
self._skipped.append(
ModelScanSkipped(
"ModelScan",
SkipCategories.SCAN_NOT_SUPPORTED,
f"Model Scan did not scan file",
str(path),
)
)

else:
logger.error(f"Error: path {path} is not valid")
self._errors.append(
ModelScanError("ModelScan", f"Path {path} is not valid")
ModelScanError(
"ModelScan", ErrorCategories.PATH, "Path is not valid", str(Path)
)
)
self._skipped.append(str(path))

def _scan_directory(self, directory_path: Path) -> None:
for path in directory_path.rglob("*"):
Expand All @@ -111,12 +126,21 @@ def _scan_source(
source=source,
data=data,
)

if scan_results is not None:
logger.info(f"Scanning {source} using {scanner.full_name()} model scan")
self._scanned.append(str(source))
self._issues.add_issues(scan_results.issues)
self._errors.extend(scan_results.errors)
scanned = True
logger.info(f"Scanning {source} using {scanner.full_name()} model scan")
if scan_results.errors:
self._errors.extend(scan_results.errors)
elif scan_results.issues:
self._scanned.append(str(source))
self._issues.add_issues(scan_results.issues)

elif scan_results.skipped:
self._skipped.extend(scan_results.skipped)
else:
self._scanned.append(str(source))

return scanned

def _scan_zip(
Expand All @@ -131,18 +155,42 @@ def _scan_zip(
source=f"{source}:{file_name}",
data=file_io,
)

if not scanned:
if _is_zipfile(file_name, data=file_io):
self._errors.append(
ModelScanError(
"ModelScan",
f"{source}:{file_name} is a zip file. ModelScan does not support nested zip files.",
ErrorCategories.NESTED_ZIP,
"ModelScan does not support nested zip files.",
f"{source}:{file_name}",
)
)

# check if added to skipped already
all_skipped_files = [
skipped.source for skipped in self._skipped
]
if f"{source}:{file_name}" not in all_skipped_files:
self._skipped.append(
ModelScanSkipped(
"ModelScan",
SkipCategories.SCAN_NOT_SUPPORTED,
f"Model Scan did not scan file",
f"{source}:{file_name}",
)
)
self._skipped.append(f"{source}:{file_name}")

except zipfile.BadZipFile as e:
logger.debug(f"Skipping zip file {source}, due to error", e, exc_info=True)
self._skipped.append(str(source))
self._skipped.append(
ModelScanSkipped(
"ModelScan",
SkipCategories.BAD_ZIP,
f"Skipping zip file due to error: {e}",
f"{source}:{file_name}",
)
)

def _generate_results(self) -> Dict[str, Any]:
report: Dict[str, Any] = {}
Expand All @@ -168,11 +216,7 @@ def _generate_results(self) -> Dict[str, Any]:
report["summary"]["absolute_path"] = str(absolute_path)
report["summary"]["modelscan_version"] = __version__
report["summary"]["timestamp"] = datetime.now().isoformat()
report["summary"]["skipped"] = {"total_skipped": len(self._skipped)}
report["summary"]["skipped"]["skipped_files"] = [
str(Path(file_name).relative_to(Path(absolute_path)))
for file_name in self._skipped
]

report["summary"]["scanned"] = {"total_scanned": len(self._scanned)}
report["summary"]["scanned"]["scanned_files"] = [
str(Path(file_name).relative_to(Path(absolute_path)))
Expand All @@ -188,7 +232,36 @@ def _generate_results(self) -> Dict[str, Any]:
Path(issue["source"]).relative_to(Path(absolute_path))
)

report["errors"] = [str(error) for index, error in enumerate(self._errors)]
all_errors = []

for error in self._errors:
error_information = {}
error_information["category"] = str(error.category.name)
if error.message is not None:
error_information["description"] = error.message
if hasattr(error, "source"):
error_information["source"] = str(
Path(str(error.source)).relative_to(Path(absolute_path))
)

all_errors.append(error_information)

report["errors"] = all_errors

report["summary"]["skipped"] = {"total_skipped": len(self._skipped)}

all_skipped_files = []

for skipped_file in self._skipped:
skipped_file_information = {}
skipped_file_information["category"] = str(skipped_file.category.name)
skipped_file_information["description"] = str(skipped_file.message)
skipped_file_information["source"] = str(
Path(skipped_file.source).relative_to(Path(absolute_path))
)
all_skipped_files.append(skipped_file_information)

report["summary"]["skipped"]["skipped_files"] = all_skipped_files

return report

Expand All @@ -211,13 +284,13 @@ def issues(self) -> Issues:
return self._issues

@property
def errors(self) -> List[Error]:
def errors(self) -> List[ModelScanError]:
return self._errors

@property
def scanned(self) -> List[str]:
return self._scanned

@property
def skipped(self) -> List[str]:
def skipped(self) -> List[ModelScanSkipped]:
return self._skipped
Loading
Loading