Skip to content

Commit

Permalink
Fixed generic fieldmappings overriding table fieldmappings, fixed for…
Browse files Browse the repository at this point in the history
…matting
  • Loading branch information
slincoln-aiq committed Sep 20, 2024
1 parent e9d522d commit ade0af7
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 104 deletions.
8 changes: 5 additions & 3 deletions sigma/pipelines/microsoft365defender/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from .microsoft365defender import microsoft_365_defender_pipeline
from .microsoft365defender import microsoft_365_defender_pipeline, microsoft_xdr_pipeline

pipelines = {
"microsoft_365_defender_pipeline": microsoft_365_defender_pipeline, # TODO: adapt identifier to something approproiate
}
"microsoft_365_defender_pipeline": microsoft_365_defender_pipeline,
"microsoft_xdr_pipeline": microsoft_xdr_pipeline,
}
5 changes: 4 additions & 1 deletion sigma/pipelines/microsoft365defender/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from sigma.processing.transformations import DetectionItemFailureTransformation, SigmaTransformationError
from sigma.processing.transformations import (
DetectionItemFailureTransformation,
SigmaTransformationError,
)
from sigma.rule import SigmaDetectionItem


Expand Down
10 changes: 5 additions & 5 deletions sigma/pipelines/microsoft365defender/finalization.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Union, List
from typing import List, Union

from sigma.processing.finalization import Finalizer

Expand All @@ -12,21 +12,21 @@ class Microsoft365DefenderTableFinalizer(Finalizer):
will keep individual queries separate and add the table name as a prefix to each query, per ordering in the
pipeline's state 'query_table' key which is appended to for each rule by set for each rule by the
SetQueryTableStateTransformation transformation.
A custom table name can be specified in the finalizer, otherwise the table name will be selected based on the category of the rule.
"""

table_names: Union[str, List[str]] = field(default_factory=list)

def apply(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[str]) -> List[str]:
def apply(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[str]) -> List[str]: # type: ignore # noqa: F821
if isinstance(self.table_names, str):
self.table_names = [self.table_names] * len(queries)

for i, query in enumerate(queries):
if self.table_names:
queries[i] = f"{self.table_names[i]}\n| where {query}"
elif 'query_table' in pipeline.state:
queries[i] = f"{pipeline.state['query_table'][i]}\n| where {query}"
elif "query_table" in pipeline.state:
queries[i] = f"{pipeline.state['query_table']}\n| where {query}"
else:
queries[i] = f"search {query}"
return queries
94 changes: 62 additions & 32 deletions sigma/pipelines/microsoft365defender/microsoft365defender.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Optional

from sigma.processing.conditions import (
DetectionItemProcessingItemAppliedCondition,
ExcludeFieldCondition,
IncludeFieldCondition,
LogsourceCondition,
RuleProcessingItemAppliedCondition,
RuleProcessingStateCondition,
)
from sigma.processing.pipeline import ProcessingItem, ProcessingPipeline
from sigma.processing.transformations import (
Expand Down Expand Up @@ -52,29 +55,24 @@ def create_xdr_schema() -> MicrosoftXDRSchema:
parent_image_field_mapping = {"ParentImage": "InitiatingProcessParentFileName"}


## ProcessingItems to set state key 'query_table' to use in backend
## i.e. $QueryTable$ | $rest_of_query$
query_table_proc_items = [
ProcessingItem(
identifier=f"microsoft_xdr_set_query_table_{category}",
transformation=SetQueryTableStateTransformation(table_name),
rule_conditions=[CATEGORY_TO_CONDITIONS_MAPPINGS[category]],
)
for category, table_name in CATEGORY_TO_TABLE_MAPPINGS.items()
]

## Fieldmappings
fieldmappings_proc_item = ProcessingItem(
identifier="microsoft_xdr_fieldmappings",
transformation=DynamicFieldMappingTransformation(MICROSOFT_XDR_FIELD_MAPPINGS),
identifier="microsoft_xdr_table_fieldmappings",
transformation=DynamicFieldMappingTransformation(MICROSOFT_XDR_FIELD_MAPPINGS.table_mappings),
)

## Generic Fielp Mappings, keep this last
## Exclude any fields already mapped. For example, if process_creation events ProcessId has already
## been mapped to the same field name (ProcessId), we don't to remap it to InitiatingProcessId
## Exclude any fields already mapped, e.g. if a table mapping has been applied.
# This will fix the case where ProcessId is usually mapped to InitiatingProcessId, EXCEPT for the DeviceProcessEvent table where it stays as ProcessId.
# So we can map ProcessId to ProcessId in the DeviceProcessEvents table mapping, and prevent the generic mapping to InitiatingProcessId from being applied
# by adding a detection item condition that the table field mappings have been applied

generic_field_mappings_proc_item = ProcessingItem(
identifier="microsoft_xdr_fieldmappings_generic",
transformation=GenericFieldMappingTransformation(MICROSOFT_XDR_FIELD_MAPPINGS),
identifier="microsoft_xdr_generic_fieldmappings",
transformation=GenericFieldMappingTransformation(MICROSOFT_XDR_FIELD_MAPPINGS.generic_mappings),
detection_item_conditions=[DetectionItemProcessingItemAppliedCondition(f"microsoft_xdr_table_fieldmappings")],
detection_item_condition_linking=any,
detection_item_condition_negation=True,
)


Expand Down Expand Up @@ -174,25 +172,54 @@ def create_xdr_schema() -> MicrosoftXDRSchema:
)
]

field_error_proc_items = [

def get_valid_fields(table_name):
return (
list(MICROSOFT_XDR_SCHEMA.tables[table_name].fields.keys())
+ list(MICROSOFT_XDR_FIELD_MAPPINGS.table_mappings.get(table_name, {}).keys())
+ list(MICROSOFT_XDR_FIELD_MAPPINGS.generic_mappings.keys())
+ ["Hashes"]
)


field_error_proc_items = []

for table_name in MICROSOFT_XDR_SCHEMA.tables.keys():
valid_fields = get_valid_fields(table_name)

field_error_proc_items.append(
ProcessingItem(
identifier=f"microsoft_xdr_unsupported_fields_{table_name}",
transformation=InvalidFieldTransformation(
f"Please use valid fields for the {table_name} table, or the following fields that have keymappings in this "
f"pipeline:\n{', '.join(sorted(set(valid_fields)))}"
),
field_name_conditions=[ExcludeFieldCondition(fields=valid_fields)],
rule_conditions=[
RuleProcessingItemAppliedCondition("microsoft_xdr_set_query_table"),
RuleProcessingStateCondition("query_table", table_name),
],
rule_condition_linking=all,
)
)

# Add a catch-all error for custom table names
field_error_proc_items.append(
ProcessingItem(
identifier=f"microsoft_xdr_unsupported_fields_{category}",
identifier="microsoft_xdr_unsupported_fields_custom",
transformation=InvalidFieldTransformation(
f"Please use valid fields for the {table_name} table, or the following fields that have keymappings in this "
f"pipeline:\n"
f"{', '.join(sorted(set(MICROSOFT_XDR_FIELD_MAPPINGS.table_mappings.get(table_name, {}).keys()).union(MICROSOFT_XDR_FIELD_MAPPINGS.generic_mappings.keys()).union({'Hashes'})))}"
"Invalid field name for the custom table. Please ensure you're using valid fields for your custom table."
),
field_name_conditions=[
ExcludeFieldCondition(
fields=MICROSOFT_XDR_SCHEMA.get_valid_fields(table_name)
+ list(MICROSOFT_XDR_FIELD_MAPPINGS.generic_mappings.keys())
+ ["Hashes"]
)
ExcludeFieldCondition(fields=list(MICROSOFT_XDR_FIELD_MAPPINGS.generic_mappings.keys()) + ["Hashes"])
],
rule_conditions=[CATEGORY_TO_CONDITIONS_MAPPINGS[category]],
rule_conditions=[
RuleProcessingItemAppliedCondition("microsoft_xdr_set_query_table"),
RuleProcessingStateCondition("query_table", None),
],
rule_condition_linking=all,
)
for category, table_name in CATEGORY_TO_TABLE_MAPPINGS.items()
]
)


def microsoft_365_defender_pipeline(
Expand Down Expand Up @@ -222,7 +249,10 @@ def microsoft_xdr_pipeline(
"""

pipeline_items = [
*query_table_proc_items,
ProcessingItem(
identifier="microsoft_xdr_set_query_table",
transformation=SetQueryTableStateTransformation(query_table),
),
fieldmappings_proc_item,
generic_field_mappings_proc_item,
*replacement_proc_items,
Expand All @@ -238,5 +268,5 @@ def microsoft_xdr_pipeline(
priority=10,
items=pipeline_items,
allowed_backends=frozenset(["kusto"]),
finalizers=[Microsoft365DefenderTableFinalizer(table_names=query_table)],
finalizers=[Microsoft365DefenderTableFinalizer()],
)
105 changes: 42 additions & 63 deletions sigma/pipelines/microsoft365defender/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from sigma.processing.transformations import (
DetectionItemTransformation,
FieldMappingTransformation,
SigmaTransformationError,
Transformation,
ValueTransformation,
)
from sigma.rule import SigmaDetection, SigmaDetectionItem, SigmaString
from sigma.types import SigmaType

from .errors import InvalidHashAlgorithmError
from .mappings import CATEGORY_TO_TABLE_MAPPINGS
from .mappings import CATEGORY_TO_TABLE_MAPPINGS, MICROSOFT_XDR_FIELD_MAPPINGS


## Custom DetectionItemTransformation to split domain and user, if applicable
Expand Down Expand Up @@ -165,83 +166,61 @@ class SetQueryTableStateTransformation(Transformation):

val: Any = None

def apply(
self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: "sigma.rule.SigmaRule" # noqa: F821
) -> None: # noqa: F821
def apply(self, pipeline: "ProcessingPipeline", rule: "SigmaRule") -> None: # type: ignore # noqa: F821
super().apply(pipeline, rule)
if "query_table" in pipeline.state:
table_name = pipeline.state["query_table"]
if self.val:
table_name = self.val
else:
category = rule.logsource.category
table_name = CATEGORY_TO_TABLE_MAPPINGS.get(category, self.val)
pipeline.state["query_table"] = pipeline.state.get("query_table", []) + [table_name]


class MicrosoftXDRFieldMapping(FieldMappingTransformation):
def __init__(self):
super().__init__({})

def apply(self, pipeline, rule, detection_item=None, state=None):
if state and "query_table" in state:
table_name = state["query_table"]
else:
# Fallback to category-based mapping if query_table is not set
category = rule.logsource.category
table_name = CATEGORY_TO_TABLE_MAPPINGS.get(category)

if table_name:
field = detection_item.field if detection_item else None
if field:
return self.get_field_mapping(table_name, field)
return None
if isinstance(table_name, list):
table_name = table_name[0] # Use the first table if it's a list
pipeline.state["query_table"] = table_name
else:
raise SigmaTransformationError(
f"Unable to determine table name for category: {category}, category is not yet supported by the pipeline. Please provide the 'query_table' parameter to the pipeline instead."
)


@dataclass
class DynamicFieldMappingTransformation(FieldMappingTransformation):
def __init__(self, field_mappings):
super().__init__({})
self.field_mappings = field_mappings

def apply_detection_item(self, detection_item: SigmaDetectionItem, pipeline: "ProcessingPipeline", rule: "SigmaRule") -> Optional[SigmaDetectionItem]: # type: ignore # noqa: F821
query_table = pipeline.state.get("query_table", [])[-1] if pipeline.state.get("query_table") else None
if not query_table:
# Fallback to category-based mapping if query_table is not set
category = rule.logsource.category
query_table = CATEGORY_TO_TABLE_MAPPINGS.get(category)

if query_table:
table_mappings = self.field_mappings.table_mappings.get(query_table, {})
if detection_item.field in table_mappings:
detection_item.field = table_mappings[detection_item.field]
elif detection_item.field in self.field_mappings.generic_mappings:
detection_item.field = self.field_mappings.generic_mappings[detection_item.field]
"""
Dynamically sets the mapping dictionary based on the pipeline state or rule's category.
"""

return detection_item
def set_dynamic_mapping(self, pipeline):
"""
Set the mapping dynamically based on the pipeline state 'query_table' or the rule's logsource category.
"""

def apply(self, pipeline: "ProcessingPipeline", rule: "SigmaRule"): # type: ignore # noqa: F821
if isinstance(rule, "SigmaRule"):
for detection in rule.detection.detections.values():
self.apply_detection(detection, pipeline, rule)
# 1. If the pipeline's state has 'query_table', use that
if "query_table" in pipeline.state:
query_table = pipeline.state["query_table"]
self.mapping = MICROSOFT_XDR_FIELD_MAPPINGS.table_mappings.get(query_table, {})

def apply_detection(self, detection: SigmaDetection, pipeline: "ProcessingPipeline", rule: "SigmaRule"): # type: ignore # noqa: F821
for i, detection_item in enumerate(detection.detection_items):
if isinstance(detection_item, SigmaDetection): # recurse into nested detection items
self.apply_detection(detection_item, pipeline, rule)
else:
if self.processing_item is None or self.processing_item.match_detection_item(pipeline, detection_item):
r = self.apply_detection_item(detection_item, pipeline, rule)
if r is not None:
detection.detection_items[i] = r
def apply(
self,
pipeline: "sigma.processing.pipeline.ProcessingPipeline", # noqa: F821 # type: ignore
rule: Union["SigmaRule", "SigmaCorrelationRule"], # noqa: F821 # type: ignore
) -> None:
"""Apply dynamic mapping before the field name transformations."""
self.set_dynamic_mapping(pipeline) # Dynamically update the mapping
super().apply(pipeline, rule) # Call parent method to continue the transformation process


class GenericFieldMappingTransformation(FieldMappingTransformation):
def __init__(self, field_mappings):
super().__init__({})
self.field_mappings = field_mappings
"""
Transformation for applying generic field mappings after table-specific mappings.
"""

def apply_detection_item(self, detection_item: SigmaDetectionItem, pipeline: "ProcessingPipeline", rule: "SigmaRule") -> Optional[SigmaDetectionItem]: # type: ignore # noqa: F821
query_table = pipeline.state.get("query_table", [])[-1] # Get the last added table name
table_mappings = self.field_mappings.table_mappings.get(query_table, {})
def __init__(self, generic_mappings):
super().__init__(generic_mappings)

if detection_item.field not in table_mappings:
detection_item.field = self.field_mappings.generic_mappings.get(detection_item.field, detection_item.field)
def apply_detection_item(
self, detection_item: SigmaDetectionItem
) -> Optional[Union[SigmaDetectionItem, SigmaString]]:
if detection_item.field in self.mapping:
detection_item.field = self.mapping[detection_item.field]
return detection_item

0 comments on commit ade0af7

Please sign in to comment.