Skip to content

Commit

Permalink
Merge pull request #127 from kelnage/distinct-condition-state
Browse files Browse the repository at this point in the history
Distinct condition state
  • Loading branch information
thomaspatzke committed Jun 28, 2023
2 parents f34a659 + 3cf5e8a commit 50a948d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
15 changes: 9 additions & 6 deletions sigma/conversion/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,26 @@ def convert_rule(self, rule : SigmaRule, output_format : Optional[str] = None) -
"""
Convert a single Sigma rule into the target data structure (usually query, see above).
"""
state = ConversionState()
try:
self.last_processing_pipeline = self.backend_processing_pipeline + self.processing_pipeline + self.output_format_processing_pipeline[output_format or self.default_format]

error_state = "applying processing pipeline on"
self.last_processing_pipeline.apply(rule) # 1. Apply transformations
state.processing_state = self.last_processing_pipeline.state

# 2. Convert conditions
error_state = "converting"
queries = [ # 2. Convert condition
self.convert_condition(cond.parsed, state)
for cond in rule.detection.parsed_condition
states = [
ConversionState(processing_state=dict(self.last_processing_pipeline.state))
for _ in rule.detection.parsed_condition
]
queries = [
self.convert_condition(cond.parsed, states[index])
for index, cond in enumerate(rule.detection.parsed_condition)
]

error_state = "finalizing query for"
return [ # 3. Postprocess generated query
self.finalize_query(rule, query, index, state, output_format or self.default_format)
self.finalize_query(rule, query, index, states[index], output_format or self.default_format)
for index, query in enumerate(queries)
]
except SigmaError as e:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_conversion_deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,28 @@ def test_deferred_conversion_or(test_backend : TextQueryTestBackend):
""")
) == ['fieldB="foo" or fieldC="bar" | mappedA="foo.*bar"']

def test_deferred_conversion_multiple_cond(test_backend : TextQueryTestBackend):
assert test_backend.convert(
SigmaCollection.from_yaml("""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA|re: foo.*bar
sel2:
fieldB|re: foo.*
sel3:
fieldC|re: .*bar
condition:
- sel1
- sel2
- sel3
""")
) == ['* | mappedA="foo.*bar"', '* | fieldB="foo.*"', '* | fieldC=".*bar"']

def test_deferred_conversion_not(test_backend : TextQueryTestBackend):
assert test_backend.convert(
SigmaCollection.from_yaml("""
Expand Down

0 comments on commit 50a948d

Please sign in to comment.