Skip to content

Commit

Permalink
Processing item conditions as dict with checks moved to base class
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Jul 16, 2024
1 parent 087ba7d commit 9c33f27
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 58 deletions.
103 changes: 51 additions & 52 deletions sigma/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ class ProcessingItemBase:
transformation: Transformation
rule_condition_linking: Optional[Callable[[Iterable[bool]], bool]] = None # any or all
rule_condition_negation: bool = False
rule_conditions: List[RuleProcessingCondition] = field(default_factory=list)
rule_conditions: Union[List[RuleProcessingCondition], Dict[str, RuleProcessingCondition]] = (
field(default_factory=list)
)
rule_condition_logic: Optional[str] = (
None # Full rule condition logic mutually exclusive to linking and negation.
)

identifier: Optional[str] = None

@classmethod
Expand All @@ -65,24 +68,40 @@ def _base_args_from_dict(
"transformation": cls._instantiate_transformation(d, transformations),
}

def __post_init__(self):
if self.rule_condition_logic is not None:
if self.rule_condition_linking is not None:
def _check_conditions(
self, logic_value, linking_attr, conditions, expected_condition_class, name
):
if logic_value is not None:
if self.__getattribute__(linking_attr) is not None:
raise SigmaConfigurationError(f"{name} logic is mutually exclusive to linking.")
if not isinstance(conditions, dict):
raise SigmaConfigurationError(
"Rule condition logic is mutually exclusive to linking."
f"{name}s must be provided as mapping from identifiers to conditions if condition logic is provided."
)
else:
if self.rule_condition_linking is None:
self.rule_condition_linking = all
if self.__getattribute__(linking_attr) is None:
self.__setattr__(linking_attr, all)

if not isinstance(self.rule_conditions, list):
raise SigmaTypeError("Rule processing conditions must be provided as list")
for rule_condition in self.rule_conditions:
if not isinstance(rule_condition, RuleProcessingCondition):
if not isinstance(conditions, (list, dict)):
raise SigmaTypeError(f"{name}s must be provided as list or dict")
if isinstance(conditions, dict):
conditions_list = conditions.values()
else:
conditions_list = conditions
for condition in conditions_list:
if not isinstance(condition, expected_condition_class):
raise SigmaTypeError(
f"Rule processing condition '{str(rule_condition)}' is not a RuleProcessingCondition"
f"{name} '{str(condition)}' is not a {expected_condition_class.__name__}"
)

def __post_init__(self):
self._check_conditions(
self.rule_condition_logic,
"rule_condition_linking",
self.rule_conditions,
RuleProcessingCondition,
"Rule condition",
)
self.transformation.set_processing_item(
self
) # set processing item in transformation object after it is instantiated
Expand Down Expand Up @@ -212,11 +231,15 @@ class ProcessingItem(ProcessingItemBase):
None # any or all
)
detection_item_condition_negation: bool = False
detection_item_conditions: List[DetectionItemProcessingCondition] = field(default_factory=list)
detection_item_conditions: Union[
List[DetectionItemProcessingCondition], Dict[str, DetectionItemProcessingCondition]
] = field(default_factory=list)
detection_item_condition_logic: Optional[str] = None
field_name_condition_linking: Optional[Callable[[Iterable[bool]], bool]] = None # any or all
field_name_condition_negation: bool = False
field_name_conditions: List[FieldNameProcessingCondition] = field(default_factory=list)
field_name_conditions: Union[
List[FieldNameProcessingCondition], Dict[str, FieldNameProcessingCondition]
] = field(default_factory=list)
field_name_condition_logic: Optional[str] = None

@classmethod
Expand Down Expand Up @@ -248,44 +271,20 @@ def from_dict(cls, d: dict):

def __post_init__(self):
super().__post_init__()
if self.detection_item_condition_logic is not None:
if self.detection_item_condition_linking is not None:
raise SigmaConfigurationError(
"Detection item condition logic is mutually exclusive to linking."
)
else:
if self.detection_item_condition_linking is None:
self.detection_item_condition_linking = all
if self.field_name_condition_logic is not None:
if self.field_name_condition_linking is not None:
raise SigmaConfigurationError(
"Field name condition logic is mutually exclusive to linking."
)
else:
if self.field_name_condition_linking is None:
self.field_name_condition_linking = all

if not isinstance(self.rule_conditions, list):
raise SigmaTypeError("Rule processing conditions must be provided as list")
for rule_condition in self.rule_conditions:
if not isinstance(rule_condition, RuleProcessingCondition):
raise SigmaTypeError(
f"Rule processing condition '{str(rule_condition)}' is not a RuleProcessingCondition"
)
if not isinstance(self.detection_item_conditions, list):
raise SigmaTypeError("Detection item processing conditions must be provided as list")
for detection_item_condition in self.detection_item_conditions:
if not isinstance(detection_item_condition, DetectionItemProcessingCondition):
raise SigmaTypeError(
f"Detection item processing condition '{str(detection_item_condition)}' is not a DetectionItemProcessingCondition"
)
if not isinstance(self.field_name_conditions, list):
raise SigmaTypeError("Field name processing conditions must be provided as list")
for field_name_condition in self.field_name_conditions:
if not isinstance(field_name_condition, FieldNameProcessingCondition):
raise SigmaTypeError(
f"Detection item processing condition '{str(field_name_condition)}' is not a FieldNameProcessingCondition"
)
self._check_conditions(
self.detection_item_condition_logic,
"detection_item_condition_linking",
self.detection_item_conditions,
DetectionItemProcessingCondition,
"Detection item condition",
)
self._check_conditions(
self.field_name_condition_logic,
"field_name_condition_linking",
self.field_name_conditions,
FieldNameProcessingCondition,
"Field name condition",
)

def apply(
self, pipeline: "ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule]
Expand Down
32 changes: 26 additions & 6 deletions tests/test_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,24 +439,24 @@ def test_processingitem_match_detection_item_negated_false(
assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item)


def test_processingitem_rule_condition_nolist():
with pytest.raises(SigmaTypeError, match="Rule processing conditions"):
def test_processingitem_rule_condition_no_list_or_dict():
with pytest.raises(SigmaTypeError, match="Rule conditions"):
ProcessingItem(
rule_conditions=LogsourceCondition(category="test"),
transformation=SetStateTransformation("test", True),
)


def test_processingitem_detection_item_condition_nolist():
with pytest.raises(SigmaTypeError, match="Detection item processing conditions"):
def test_processingitem_detection_item_condition_no_list_or_dict():
with pytest.raises(SigmaTypeError, match="Detection item conditions"):
ProcessingItem(
detection_item_conditions=DetectionItemProcessingItemAppliedCondition("test"),
transformation=SetStateTransformation("test", True),
)


def test_processingitem_field_name_condition_nolist():
with pytest.raises(SigmaTypeError, match="Field name processing conditions"):
def test_processingitem_field_name_condition_no_list_or_dict():
with pytest.raises(SigmaTypeError, match="Field name conditions"):
ProcessingItem(
field_name_conditions=IncludeFieldCondition(fields=["test"]),
transformation=SetStateTransformation("test", True),
Expand All @@ -471,6 +471,14 @@ def test_processingitem_wrong_rule_condition():
)


def test_processingitem_wrong_rule_condition_dict():
with pytest.raises(SigmaTypeError, match="RuleProcessingCondition"):
ProcessingItem(
rule_conditions={"cond": IncludeFieldCondition(fields=["testfield"])},
transformation=SetStateTransformation("test", True),
)


def test_processingitem_rule_condition_linking_with_logic():
with pytest.raises(SigmaConfigurationError, match="Rule condition logic is mutually exclusive"):
ProcessingItem(
Expand Down Expand Up @@ -514,6 +522,18 @@ def test_processingitem_field_name_condition_linking_with_logic():
)


def test_processingitem_rule_condition_logic_with_list():
with pytest.raises(SigmaConfigurationError, match="mapping from identifiers to conditions"):
ProcessingItem(
rule_conditions=[
RuleConditionTrue(dummy="test-true"),
RuleConditionFalse(dummy="test-false"),
],
rule_condition_logic="cond1 or cond2",
transformation=SetStateTransformation("test", True),
)


def test_processingitem_wrong_detection_item_condition():
with pytest.raises(SigmaTypeError, match="DetectionItemProcessingCondition"):
ProcessingItem(
Expand Down

0 comments on commit 9c33f27

Please sign in to comment.