diff --git a/sigma/correlations.py b/sigma/correlations.py index e6176a73..6e85fb3f 100644 --- a/sigma/correlations.py +++ b/sigma/correlations.py @@ -66,9 +66,10 @@ def from_dict( return cls(op=cond_op, count=cond_count) + def to_dict(self) -> dict: + return {self.op.name.lower(): self.count} + -# Calculates the number of seconds from a time specifier consisting of a number and a unit. -# The unit can be one of the following: s, m, h, d, w, M, y def parse_timespan(timespan: str) -> int: """ Parses a string representing a time span and returns the equivalent number of seconds. @@ -99,6 +100,32 @@ def parse_timespan(timespan: str) -> int: raise sigma_exceptions.SigmaTimespanError(f"Timespan '{ timespan }' is invalid.") +def seconds_to_timespan(seconds: int) -> str: + """ + Converts a number of seconds into a time span string. + + Args: + seconds (int): The number of seconds to convert. + + Returns: + str: The time span string. + """ + if seconds % 31556952 == 0: + return f"{ seconds // 31556952 }y" + elif seconds % 2629746 == 0: + return f"{ seconds // 2629746 }M" + elif seconds % 604800 == 0: + return f"{ seconds // 604800 }w" + elif seconds % 86400 == 0: + return f"{ seconds // 86400 }d" + elif seconds % 3600 == 0: + return f"{ seconds // 3600 }h" + elif seconds % 60 == 0: + return f"{ seconds // 60 }m" + else: + return f"{ seconds }s" + + @dataclass class SigmaCorrelationRule(SigmaRuleBase): type: SigmaCorrelationType = None @@ -242,3 +269,18 @@ def from_dict( errors=errors, **kwargs, ) + + def to_dict(self) -> dict: + d = super().to_dict() + dc = { + "type": self.type.name.lower(), + "rules": [rule.reference for rule in self.rules], + "timespan": seconds_to_timespan(self.timespan), + "group-by": self.group_by, + "ordered": self.ordered, + } + if self.condition is not None: + dc["condition"] = self.condition.to_dict() + d["correlation"] = dc + + return d diff --git a/tests/test_correlations.py b/tests/test_correlations.py index ebb6674c..9c842269 100644 --- a/tests/test_correlations.py +++ b/tests/test_correlations.py @@ -268,6 +268,32 @@ def test_correlation_without_condition(): ) +def test_correlation_to_dict(): + rule = SigmaCorrelationRule.from_dict( + { + "title": "Valid correlation", + "correlation": { + "type": "event_count", + "rules": "failed_login", + "group-by": "user", + "timespan": "10m", + "condition": {"gte": 10}, + }, + } + ) + assert rule.to_dict() == { + "title": "Valid correlation", + "correlation": { + "type": "event_count", + "rules": ["failed_login"], + "group-by": ["user"], + "timespan": "10m", + "ordered": False, + "condition": {"gte": 10}, + }, + } + + def test_correlation_condition(): cond = SigmaCorrelationCondition.from_dict({"gte": 10}) assert isinstance(cond, SigmaCorrelationCondition) @@ -297,3 +323,8 @@ def test_correlation_condition_invalid_count(): match="'test' is no valid Sigma correlation condition count", ): SigmaCorrelationCondition.from_dict({"gte": "test"}) + + +def test_correlation_condition_to_dict(): + cond = SigmaCorrelationCondition.from_dict({"gte": 10}) + assert cond.to_dict() == {"gte": 10}