Skip to content

Commit

Permalink
Rule backreferences and ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Nov 6, 2023
1 parent 4a8fe5b commit 48aede0
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 14 deletions.
10 changes: 10 additions & 0 deletions sigma/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,18 @@ def __post_init__(self):
self.ids_to_rules[rule.id] = rule
if rule.name is not None:
self.names_to_rules[rule.name] = rule

def resolve_rule_references(self):
"""
Resolve rule references in correlation rules to the actual rule objects and sort the rules
by reference order (rules that are referenced by other rules come first).
This must be called before referencing rules are converted into queries to make references available.
"""
for rule in self.rules:
if isinstance(rule, SigmaCorrelationRule):
rule.resolve_rule_references(self)
self.rules = list(sorted(self.rules))

@classmethod
def from_dicts(
Expand Down
5 changes: 3 additions & 2 deletions sigma/correlations.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,5 +269,6 @@ def resolve_rule_references(self, rule_collection: "sigma.collection.SigmaCollec
Raises:
sigma_exceptions.SigmaRuleNotFoundError: If a referenced rule cannot be found in the given rule collection.
"""
for rule in self.rules:
rule.resolve(rule_collection)
for rule_ref in self.rules:
rule_ref.resolve(rule_collection)
rule_ref.rule.add_backreference(self)
16 changes: 16 additions & 0 deletions sigma/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ class SigmaRuleBase:
source: Optional[SigmaRuleLocation] = field(default=None, compare=False)
custom_attributes: Dict[str, Any] = field(compare=False, default_factory=dict)

_backreferences: List["SigmaRuleBase"] = field(
init=False, default_factory=list, repr=False, compare=False
)

def __post_init__(self):
for field in ("references", "tags", "fields", "falsepositives"):
if self.__getattribute__(field) is None:
Expand Down Expand Up @@ -804,6 +808,18 @@ def to_dict(self) -> dict:

return d

def add_backreference(self, rule: "SigmaRuleBase"):
"""Add backreference to another rule."""
self._backreferences.append(rule)

def referenced_by(self, rule: "SigmaRuleBase") -> bool:
"""Check if rule is referenced by another rule."""
return rule in self._backreferences

def __lt__(self, other: "SigmaRuleBase") -> bool:
"""Sort rules by backreference. A rule referenced by another rule is smaller."""
return self.referenced_by(other)


@dataclass
class SigmaRule(SigmaRuleBase, ProcessingItemTrackingMixin):
Expand Down
24 changes: 13 additions & 11 deletions tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,18 @@ def test_index_rule_by_name_not_existing(ruleset):

@pytest.fixture
def rules_with_correlation():
return SigmaCollection.from_yaml(
rule_collection = SigmaCollection.from_yaml(
"""
title: Correlating 1+2
name: corr-1-2
correlation:
type: temporal
rules:
- rule-1
- rule-2
group-by: user
timespan: 5m
---
title: Rule 1
name: rule-1
logsource:
Expand All @@ -344,18 +354,10 @@ def rules_with_correlation():
selection:
ImageFile|endswith: '\\\\b.exe'
condition: selection
---
title: Correlating 1+2
name: corr-1-2
correlation:
type: temporal
rules:
- rule-1
- rule-2
group-by: user
timespan: 5m
"""
)
rule_collection.resolve_rule_references()
return rule_collection


def test_load_ruleset_with_correlation(rules_with_correlation):
Expand Down
4 changes: 3 additions & 1 deletion tests/test_correlations.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,9 @@ def test_correlation_condition_to_dict():

def test_correlation_resolve_rule_references(rule_collection, correlation_rule):
correlation_rule.resolve_rule_references(rule_collection)
assert correlation_rule.rules[0].rule == rule_collection["failed_login"]
rule = rule_collection["failed_login"]
assert correlation_rule.rules[0].rule == rule
assert rule.referenced_by(correlation_rule)


def test_correlation_resolve_rule_references_invalid_reference(correlation_rule):
Expand Down
23 changes: 23 additions & 0 deletions tests/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,3 +1142,26 @@ def test_sigma_rule_overlapping_selections():
and all((isinstance(arg, ConditionAND) for arg in cond.args))
and [len(ands.args) for ands in cond.args] == [2, 4]
)


def test_sigma_rule_backreference(sigma_rule):
sigma_rule_2 = SigmaRule.from_dict(
{
"title": "Test",
"logsource": {
"category": "process_creation",
"product": "windows",
},
"detection": {
"selection": {
"CommandLine|endswith": "test.exe",
},
"condition": "selection",
},
}
)
sigma_rule.add_backreference(sigma_rule_2)
assert sigma_rule.referenced_by(sigma_rule_2)
assert sigma_rule < sigma_rule_2
assert not sigma_rule_2.referenced_by(sigma_rule)
assert not sigma_rule_2 < sigma_rule

0 comments on commit 48aede0

Please sign in to comment.