diff --git a/sigma/processing/pipeline.py b/sigma/processing/pipeline.py index 539bfb2e..fd62529c 100644 --- a/sigma/processing/pipeline.py +++ b/sigma/processing/pipeline.py @@ -42,10 +42,13 @@ class ProcessingItemBase: transformation: Transformation rule_condition_linking: Optional[Callable[[Iterable[bool]], bool]] = None # any or all rule_condition_negation: bool = False - rule_conditions: List[RuleProcessingCondition] = field(default_factory=list) + rule_conditions: Union[List[RuleProcessingCondition], Dict[str, RuleProcessingCondition]] = ( + field(default_factory=list) + ) rule_condition_logic: Optional[str] = ( None # Full rule condition logic mutually exclusive to linking and negation. ) + identifier: Optional[str] = None @classmethod @@ -65,24 +68,40 @@ def _base_args_from_dict( "transformation": cls._instantiate_transformation(d, transformations), } - def __post_init__(self): - if self.rule_condition_logic is not None: - if self.rule_condition_linking is not None: + def _check_conditions( + self, logic_value, linking_attr, conditions, expected_condition_class, name + ): + if logic_value is not None: + if self.__getattribute__(linking_attr) is not None: + raise SigmaConfigurationError(f"{name} logic is mutually exclusive to linking.") + if not isinstance(conditions, dict): raise SigmaConfigurationError( - "Rule condition logic is mutually exclusive to linking." + f"{name}s must be provided as mapping from identifiers to conditions if condition logic is provided." ) else: - if self.rule_condition_linking is None: - self.rule_condition_linking = all + if self.__getattribute__(linking_attr) is None: + self.__setattr__(linking_attr, all) - if not isinstance(self.rule_conditions, list): - raise SigmaTypeError("Rule processing conditions must be provided as list") - for rule_condition in self.rule_conditions: - if not isinstance(rule_condition, RuleProcessingCondition): + if not isinstance(conditions, (list, dict)): + raise SigmaTypeError(f"{name}s must be provided as list or dict") + if isinstance(conditions, dict): + conditions_list = conditions.values() + else: + conditions_list = conditions + for condition in conditions_list: + if not isinstance(condition, expected_condition_class): raise SigmaTypeError( - f"Rule processing condition '{str(rule_condition)}' is not a RuleProcessingCondition" + f"{name} '{str(condition)}' is not a {expected_condition_class.__name__}" ) + def __post_init__(self): + self._check_conditions( + self.rule_condition_logic, + "rule_condition_linking", + self.rule_conditions, + RuleProcessingCondition, + "Rule condition", + ) self.transformation.set_processing_item( self ) # set processing item in transformation object after it is instantiated @@ -212,11 +231,15 @@ class ProcessingItem(ProcessingItemBase): None # any or all ) detection_item_condition_negation: bool = False - detection_item_conditions: List[DetectionItemProcessingCondition] = field(default_factory=list) + detection_item_conditions: Union[ + List[DetectionItemProcessingCondition], Dict[str, DetectionItemProcessingCondition] + ] = field(default_factory=list) detection_item_condition_logic: Optional[str] = None field_name_condition_linking: Optional[Callable[[Iterable[bool]], bool]] = None # any or all field_name_condition_negation: bool = False - field_name_conditions: List[FieldNameProcessingCondition] = field(default_factory=list) + field_name_conditions: Union[ + List[FieldNameProcessingCondition], Dict[str, FieldNameProcessingCondition] + ] = field(default_factory=list) field_name_condition_logic: Optional[str] = None @classmethod @@ -248,44 +271,20 @@ def from_dict(cls, d: dict): def __post_init__(self): super().__post_init__() - if self.detection_item_condition_logic is not None: - if self.detection_item_condition_linking is not None: - raise SigmaConfigurationError( - "Detection item condition logic is mutually exclusive to linking." - ) - else: - if self.detection_item_condition_linking is None: - self.detection_item_condition_linking = all - if self.field_name_condition_logic is not None: - if self.field_name_condition_linking is not None: - raise SigmaConfigurationError( - "Field name condition logic is mutually exclusive to linking." - ) - else: - if self.field_name_condition_linking is None: - self.field_name_condition_linking = all - - if not isinstance(self.rule_conditions, list): - raise SigmaTypeError("Rule processing conditions must be provided as list") - for rule_condition in self.rule_conditions: - if not isinstance(rule_condition, RuleProcessingCondition): - raise SigmaTypeError( - f"Rule processing condition '{str(rule_condition)}' is not a RuleProcessingCondition" - ) - if not isinstance(self.detection_item_conditions, list): - raise SigmaTypeError("Detection item processing conditions must be provided as list") - for detection_item_condition in self.detection_item_conditions: - if not isinstance(detection_item_condition, DetectionItemProcessingCondition): - raise SigmaTypeError( - f"Detection item processing condition '{str(detection_item_condition)}' is not a DetectionItemProcessingCondition" - ) - if not isinstance(self.field_name_conditions, list): - raise SigmaTypeError("Field name processing conditions must be provided as list") - for field_name_condition in self.field_name_conditions: - if not isinstance(field_name_condition, FieldNameProcessingCondition): - raise SigmaTypeError( - f"Detection item processing condition '{str(field_name_condition)}' is not a FieldNameProcessingCondition" - ) + self._check_conditions( + self.detection_item_condition_logic, + "detection_item_condition_linking", + self.detection_item_conditions, + DetectionItemProcessingCondition, + "Detection item condition", + ) + self._check_conditions( + self.field_name_condition_logic, + "field_name_condition_linking", + self.field_name_conditions, + FieldNameProcessingCondition, + "Field name condition", + ) def apply( self, pipeline: "ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule] diff --git a/tests/test_processing_pipeline.py b/tests/test_processing_pipeline.py index 7ea7f922..391f5eca 100644 --- a/tests/test_processing_pipeline.py +++ b/tests/test_processing_pipeline.py @@ -439,24 +439,24 @@ def test_processingitem_match_detection_item_negated_false( assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) -def test_processingitem_rule_condition_nolist(): - with pytest.raises(SigmaTypeError, match="Rule processing conditions"): +def test_processingitem_rule_condition_no_list_or_dict(): + with pytest.raises(SigmaTypeError, match="Rule conditions"): ProcessingItem( rule_conditions=LogsourceCondition(category="test"), transformation=SetStateTransformation("test", True), ) -def test_processingitem_detection_item_condition_nolist(): - with pytest.raises(SigmaTypeError, match="Detection item processing conditions"): +def test_processingitem_detection_item_condition_no_list_or_dict(): + with pytest.raises(SigmaTypeError, match="Detection item conditions"): ProcessingItem( detection_item_conditions=DetectionItemProcessingItemAppliedCondition("test"), transformation=SetStateTransformation("test", True), ) -def test_processingitem_field_name_condition_nolist(): - with pytest.raises(SigmaTypeError, match="Field name processing conditions"): +def test_processingitem_field_name_condition_no_list_or_dict(): + with pytest.raises(SigmaTypeError, match="Field name conditions"): ProcessingItem( field_name_conditions=IncludeFieldCondition(fields=["test"]), transformation=SetStateTransformation("test", True), @@ -471,6 +471,14 @@ def test_processingitem_wrong_rule_condition(): ) +def test_processingitem_wrong_rule_condition_dict(): + with pytest.raises(SigmaTypeError, match="RuleProcessingCondition"): + ProcessingItem( + rule_conditions={"cond": IncludeFieldCondition(fields=["testfield"])}, + transformation=SetStateTransformation("test", True), + ) + + def test_processingitem_rule_condition_linking_with_logic(): with pytest.raises(SigmaConfigurationError, match="Rule condition logic is mutually exclusive"): ProcessingItem( @@ -514,6 +522,18 @@ def test_processingitem_field_name_condition_linking_with_logic(): ) +def test_processingitem_rule_condition_logic_with_list(): + with pytest.raises(SigmaConfigurationError, match="mapping from identifiers to conditions"): + ProcessingItem( + rule_conditions=[ + RuleConditionTrue(dummy="test-true"), + RuleConditionFalse(dummy="test-false"), + ], + rule_condition_logic="cond1 or cond2", + transformation=SetStateTransformation("test", True), + ) + + def test_processingitem_wrong_detection_item_condition(): with pytest.raises(SigmaTypeError, match="DetectionItemProcessingCondition"): ProcessingItem(