Skip to content

Commit

Permalink
Convert correlation rules to dict
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Oct 29, 2023
1 parent 42a3d0c commit 37910bf
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
46 changes: 44 additions & 2 deletions sigma/correlations.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ def from_dict(

return cls(op=cond_op, count=cond_count)

def to_dict(self) -> dict:
return {self.op.name.lower(): self.count}


# Calculates the number of seconds from a time specifier consisting of a number and a unit.
# The unit can be one of the following: s, m, h, d, w, M, y
def parse_timespan(timespan: str) -> int:
"""
Parses a string representing a time span and returns the equivalent number of seconds.
Expand Down Expand Up @@ -99,6 +100,32 @@ def parse_timespan(timespan: str) -> int:
raise sigma_exceptions.SigmaTimespanError(f"Timespan '{ timespan }' is invalid.")


def seconds_to_timespan(seconds: int) -> str:
"""
Converts a number of seconds into a time span string.
Args:
seconds (int): The number of seconds to convert.
Returns:
str: The time span string.
"""
if seconds % 31556952 == 0:
return f"{ seconds // 31556952 }y"
elif seconds % 2629746 == 0:
return f"{ seconds // 2629746 }M"
elif seconds % 604800 == 0:
return f"{ seconds // 604800 }w"
elif seconds % 86400 == 0:
return f"{ seconds // 86400 }d"
elif seconds % 3600 == 0:
return f"{ seconds // 3600 }h"
elif seconds % 60 == 0:
return f"{ seconds // 60 }m"
else:
return f"{ seconds }s"


@dataclass
class SigmaCorrelationRule(SigmaRuleBase):
type: SigmaCorrelationType = None
Expand Down Expand Up @@ -242,3 +269,18 @@ def from_dict(
errors=errors,
**kwargs,
)

def to_dict(self) -> dict:
d = super().to_dict()
dc = {
"type": self.type.name.lower(),
"rules": [rule.reference for rule in self.rules],
"timespan": seconds_to_timespan(self.timespan),
"group-by": self.group_by,
"ordered": self.ordered,
}
if self.condition is not None:
dc["condition"] = self.condition.to_dict()
d["correlation"] = dc

return d
31 changes: 31 additions & 0 deletions tests/test_correlations.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,32 @@ def test_correlation_without_condition():
)


def test_correlation_to_dict():
rule = SigmaCorrelationRule.from_dict(
{
"title": "Valid correlation",
"correlation": {
"type": "event_count",
"rules": "failed_login",
"group-by": "user",
"timespan": "10m",
"condition": {"gte": 10},
},
}
)
assert rule.to_dict() == {
"title": "Valid correlation",
"correlation": {
"type": "event_count",
"rules": ["failed_login"],
"group-by": ["user"],
"timespan": "10m",
"ordered": False,
"condition": {"gte": 10},
},
}


def test_correlation_condition():
cond = SigmaCorrelationCondition.from_dict({"gte": 10})
assert isinstance(cond, SigmaCorrelationCondition)
Expand Down Expand Up @@ -297,3 +323,8 @@ def test_correlation_condition_invalid_count():
match="'test' is no valid Sigma correlation condition count",
):
SigmaCorrelationCondition.from_dict({"gte": "test"})


def test_correlation_condition_to_dict():
cond = SigmaCorrelationCondition.from_dict({"gte": 10})
assert cond.to_dict() == {"gte": 10}

0 comments on commit 37910bf

Please sign in to comment.