Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Analyze logsources #53

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 37 additions & 73 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@ pysigma = "^0.11.9"
colorama = "^0.4.6"

[tool.poetry.dev-dependencies]
pytest = "^6.2.2"
pytest-cov = "^2.11.1"
pytest = "^7.3.0"
pytest-cov = "^4.0.0"
defusedxml = "^0.7.1"

[tool.poetry.scripts]
sigma = "sigma.cli.main:main"

[tool.pytest.ini_options]
python_paths = ["."]
testpaths = ["tests"]
minversion = "6.0"
python_files = "test_*.py"
addopts = "--cov=sigma --cov-report term --cov-report xml:cov.xml"
testpaths = [
"tests",
]
filterwarnings = [
'ignore:Unverified HTTPS request'
]

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
55 changes: 55 additions & 0 deletions sigma/analyze/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import copy
from typing import Dict, List
from sigma.rule import SigmaRule, SigmaLevel
from sigma.collection import SigmaCollection


rule_level_mapping = {
None: "None",
SigmaLevel.INFORMATIONAL: "Informational",
SigmaLevel.LOW: "Low",
SigmaLevel.MEDIUM: "Medium",
SigmaLevel.HIGH: "High",
SigmaLevel.CRITICAL: "Critical",
}

template_stat_detail = {
"Overall": 0,
"Critical": 0,
"High": 0,
"Medium": 0,
"Low": 0,
"Informational": 0,
"None": 0,
}


def format_row(row: str, column_widths: List) -> str:
"""Format rows for table."""
return " | ".join(
f"{str(item).ljust(width)}" for item, width in zip(row, column_widths)
)


def get_rulelevel_mapping(rule: SigmaRule) -> int:
"""Calculate rule score according to rule_level_scores."""
return rule_level_mapping[rule.level]


def create_logsourcestats(rules: SigmaCollection) -> Dict[str, int]:
"""
Iterate through all the rules and count SigmaLevel grouped by
Logsource Category Name.
"""
stats = {}

for rule in rules:
if hasattr(rule, "logsource"):
# Create stats key for logsource category.
if not rule.logsource.category in stats:
stats[rule.logsource.category] = copy.deepcopy(template_stat_detail)

stats[rule.logsource.category]["Overall"] += 1
stats[rule.logsource.category][get_rulelevel_mapping(rule)] += 1

return stats
56 changes: 56 additions & 0 deletions sigma/cli/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
mitre_attack_techniques_tactics_mapping,
mitre_attack_version,
)
from sigma.analyze.stats import create_logsourcestats, format_row


@click.group(name="analyze", help="Analyze Sigma rule sets")
Expand Down Expand Up @@ -126,3 +127,58 @@ def analyze_attack(
"techniques": layer_techniques,
}
json.dump(layer, output, indent=2)

@analyze_group.command(name="logsource", help="Create stats about logsources.")
@click.option(
"--file-pattern",
"-P",
default="*.yml",
show_default=True,
help="Pattern for file names to be included in recursion into directories.",
)
@click.option(
"--sort-by",
"-k",
type=str,
default="Overall",
show_default=True,
help="Sort by column.",
)
@click.argument(
"output",
type=click.File("w"),
)
@click.argument(
"input",
nargs=-1,
required=True,
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
)
def analyze_logsource(
file_pattern,
sort_by,
output,
input,
):
rules = load_rules(input, file_pattern)
stats = create_logsourcestats(rules)

# Extract column header
headers = ["Logsource"] + list(next(iter(stats.values())).keys())

# Prepare rows
rows = [[key] + list(value.values()) for key, value in stats.items()]
sort_index = headers.index(sort_by)
rows.sort(key=lambda x: x[sort_index], reverse=True)

# Determine col width
column_widths = [
max(len(str(item)) for item in column) for column in zip(*([headers] + rows))
]

# Print table
print("-+-".join("-" * width for width in column_widths), file=output)
print(format_row(headers, column_widths), file=output)
print("-+-".join("-" * width for width in column_widths), file=output)
for row in rows:
print(format_row(row, column_widths), file=output)
Loading
Loading