diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 2ea58bc..0b9d0b8 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,8 +23,8 @@ jobs: [ "$(poetry version -s)" == "${GITHUB_REF#refs/tags/v}" ] - name: Install dependencies run: poetry install - - name: Run tests - run: poetry run pytest + #- name: Run tests + # run: poetry run pytest - name: Build packages run: poetry build - name: Configure Poetry diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5f6f336..f9396e2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,18 +24,3 @@ jobs: cache: poetry - name: Install dependencies run: poetry install - - name: Run tests - run: poetry run pytest --cov=sigma --cov-report term --cov-report xml:cov.xml -vv - - name: Store coverage for badge - if: ${{ runner.os == 'Linux' }} - run: poetry run python print-coverage.py >> $GITHUB_ENV - - name: Create coverage badge - if: ${{ github.repository == 'AttackIQ/pySigma-backend-microsoft365defender' && github.event_name == 'push' && runner.os == 'Linux' }} - uses: schneegans/dynamic-badges-action@v1.1.0 - with: - auth: ${{ secrets.GIST_SECRET }} - gistID: 9c0879725c7f94387801390bbb0ac8d6 - filename: slincoln-aiq-pySigma-backend-microsoft365defender.json - label: Coverage - message: ${{ env.COVERAGE }} - color: ${{ env.COVERAGE_COLOR }} diff --git a/README.md b/README.md index 5d87d2a..6f7f4f0 100644 --- a/README.md +++ b/README.md @@ -1,196 +1,4 @@ -# 🛡️ pySigma Kusto Query Language (KQL) Backend +# 🛡️ pySigma Microsoft 365 Defender Backend -![Tests](https://github.com/AttackIQ/pySigma-backend-microsoft365defender/actions/workflows/test.yml/badge.svg) -![Coverage Badge](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/slincoln-aiq/9c0879725c7f94387801390bbb0ac8d6/raw/slincoln-aiq-pySigma-backend-microsoft365defender.json) -![Status](https://img.shields.io/badge/Status-pre--release-orange) - -## 📖 Overview - -This backend for [pySigma](https://github.com/SigmaHQ/pySigma) enables the transformation of Sigma Rules into queries in [Kusto Query Language (KQL)](https://learn.microsoft.com/en-us/kusto/query/?view=microsoft-fabric) for products such as [Microsoft 365 Defender Advanced Hunting Queries](https://learn.microsoft.com/en-us/microsoft-365/security/defender/advanced-hunting-query-language?view=o365-worldwide), [Azure Sentinel Queries](https://learn.microsoft.com/en-us/azure/sentinel/kusto-overview), and more! - -This project was formally named pySigma Microsoft 365 Defender Backend, or pySigma-microsoft365defender-backend. - -### 🔑 Key Features -- Provides `sigma.backends.kusto` package with `KustoBackend` class -- Includes `microsoft_365_defender_pipeline` and `sentinelasim_pipeline` for field renames and error handling -- Supports output format: Query string for Advanced Hunting Queries in KQL - -### 🧑‍💻 Maintainer -- [Stephen Lincoln](https://github.com/slincoln-aiq) via [AttackIQ](https://github.com/AttackIQ) - -## 🚀 Installation - -### 📦 Using pip - -```bash -pip install pysigma-backend-kusto -``` - - -### 🔌 Using pySigma Plugins (requires pySigma >= 0.10.0) - -```python -from sigma.plugins import SigmaPluginDirectory # Requires pySigma >= 0.10.0 - -plugins = SigmaPluginDirectory.default_plugin_directory() -plugins.get_plugin_by_id("kusto").install() -``` - - -## 🔧 Dependencies -- pySigma >= v0.10.0 - -## 📘 Usage - -### 🖥️ sigma-cli - -Use with `sigma-cli` per [typical sigma-cli usage](https://github.com/SigmaHQ/sigma-cli#usage): - -```bash -sigma convert -t kusto -p microsoft_365_defender -f default -s ~/sigma/rules -``` - -### 🐍 Python Script - -Use the backend and pipeline in a standalone Python script. Note, the backend automatically applies the pipeline, but -you can manually add it if you would like. - -```python -from sigma.rule import SigmaRule -from sigma.backends.kusto import KustoBackend -from sigma.pipelines.microsoft365defender import microsoft_365_defender_pipeline - -# Define an example rule as a YAML str -sigma_rule = SigmaRule.from_yaml(""" - title: Mimikatz CommandLine - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine|contains: mimikatz.exe - condition: sel -""") -# Create backend, which automatically adds the pipeline -kusto_backend = KustoBackend() - -# Or apply the pipeline manually -pipeline = microsoft_365_defender_pipeline() -pipeline.apply(sigma_rule) - -# Convert the rule -print(sigma_rule.title + " KQL Query: \n") -print(kusto_backend.convert_rule(sigma_rule)[0]) -``` - -Output: - -``` -Mimikatz CommandLine KQL Query: - -DeviceProcessEvents -| where ProcessCommandLine contains "mimikatz.exe" -```` - -## 🛠️ Advanced Features - -### 🔄 Pipeline & Backend Args (New in 0.2.0) - -- `transform_parent_image`: Controls ParentImage field mapping behavior - - When set to `True` (default), maps ParentImage to InitiatingProcessParentFileName - - When set to `False`, maps ParentImage to InitiatingProcessFileName - - Useful for adjusting field mappings based on specific rule requirements - - Example usage: - -```python -from sigma.pipelines.microsoft365defender import microsoft_365_defender_pipeline -pipeline = microsoft_365_defender_pipeline(transform_parent_image=False) -``` - -This argument allows fine-tuning of the ParentImage field mapping, which can be crucial for accurate rule conversion in certain scenarios. By default, it follows the behavior of mapping ParentImage to the parent process name, but setting it to `False` allows for mapping to the initiating process name instead. - -### 🗃️ Custom Table Names (New in 0.3.0) (Experimental) - -- `query_table`: Allows user to override table mappings and set their own table name - - Experimental feature, implementation is subject to change - - Example usage: - -via YAML -```YAML -# test_table_name_pipeline.yml -transformations: -- id: test_name_name - type: set_state - key: "query_table" - val: ["MyTestTable"] -``` -```bash -sigma convert -t kusto -p microsoft_365_defender -p test_table_name_pipeline.yml test_rule.yml -``` - -via Python - -```python -from sigma.pipelines.microsoft365defender import microsoft_365_defender_pipeline -my_pipeline = microsoft_365_defender_pipeline(query_table="MyTestTable") # Or ["MyTestTable"] -``` - -## 📊 Rule Support - -### 🖥️ Supported Categories (product=windows) -- process_creation -- image_load -- network_connection -- file_access, file_change, file_delete, file_event, file_rename -- registry_add, registry_delete, registry_event, registry_set - -## 🔍 Processing Pipeline - -The `microsoft_365_defender_pipeline` includes custom `ProcessingPipeline` `Transformation` classes: - -- 🔀 ParentImageValueTransformation - - Extracts the parent process name from the Sysmon ParentImage field - - Maps to InitiatingProcessParentFileName (as InitiatingProcessParentFolderPath is not available) - - Use before mapping ParentImage to InitiatingProcessFileName - -- 🔢 SplitDomainUserTransformation - - Splits the User field into separate domain and username fields - - Handles Sysmon `User` field containing both domain and username - - Creates new SigmaDetectionItems for Domain and Username - - Use with field_name_condition for username fields - -- 🔐 HashesValuesTransformation - - Processes 'Hashes' field values in 'algo:hash_value' format - - Creates new SigmaDetectionItems for each hash type - - Infers hash type based on length if not specified - - Use with field_name_condition for the Hashes field - -- 📝 RegistryActionTypeValueTransformation - - Adjusts registry ActionType values to match Microsoft DeviceRegistryEvents table - - Ensures compatibility between Sysmon and Microsoft 365 Defender schemas - -- ❌ InvalidFieldTransformation - - Extends DetectionItemFailureTransformation - - Includes the field name in the error message - - Helps identify unsupported or invalid fields in the rule - -- 🏷️ SetQueryTableStateTransformation - - Appends rule query table to pipeline state query_table key - - Used to set custom table names for queries - -The pipeline also includes a custom `Finalizer`: - -- 📊 Microsoft365DefenderTableFinalizer - - Adds the table name as a prefix to each query - - Uses custom table names if specified, otherwise selects based on rule category - - Keeps individual queries separate instead of combining them - - Allows for fine-grained control over query table selection - -## ⚠️ Limitations and Constraints - -- Works only for `product=windows` and listed rule categories -- Unsupported fields may cause exceptions (improvements in progress) - -For more detailed information, please refer to the full documentation. +This project has been renamed to pySigma Kusto Backend. This branch is just a stub to release the old `pySigma-backend-microsoft365defender` which automatically installs `pySigma-kusto-backend`. diff --git a/pyproject.toml b/pyproject.toml index afe6801..57591fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,26 +1,16 @@ [tool.poetry] -name = "pySigma-backend-kusto" -version = "0.3.1" -description = "pySigma Kusto backend" +name = "pySigma-backend-microsoft365defender" +version = "0.3.1s" +description = "This package has been renamed to pySigma-backend-kusto. Install pySigma-backend-kusto instead." authors = ["Stephen Lincoln "] license = "LGPL-3.0-only" repository = "https://github.com/AttackIQ/pySigma-backend-kusto" -packages = [ - { include = "sigma" } -] +packages = [] [tool.poetry.dependencies] python = "^3.8" -pysigma = ">= 0.10.0" -certifi = ">=2023.07.22" - -[tool.poetry.dev-dependencies] - -[tool.poetry.group.dev.dependencies] -pytest = "^7.2.1" -pytest-cov = "^4.0.0" -coverage = "^7.2.1" +pySigma-backend-kusto = ">=0.2.6" [build-system] requires = ["poetry-core>=1.0.0"] -build-backend = "poetry.core.masonry.api" +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/sigma/backends/kusto/__init__.py b/sigma/backends/kusto/__init__.py deleted file mode 100644 index b9aa6c4..0000000 --- a/sigma/backends/kusto/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from .kusto import KustoBackend -# TODO: add all backend classes that should be exposed to the user of your backend in the import statement above. - -backends = { # Mapping between backend identifiers and classes. This is used by the pySigma plugin system to recognize backends and expose them with the identifier. - "kusto": KustoBackend, -} \ No newline at end of file diff --git a/sigma/backends/kusto/kusto.py b/sigma/backends/kusto/kusto.py deleted file mode 100644 index 8520f64..0000000 --- a/sigma/backends/kusto/kusto.py +++ /dev/null @@ -1,216 +0,0 @@ -import re -from typing import ClassVar, Dict, Tuple, Pattern, Any, Union, Optional - -from sigma.conditions import ConditionItem, ConditionAND, ConditionOR, ConditionNOT, ConditionFieldEqualsValueExpression -from sigma.conversion.base import TextQueryBackend -from sigma.conversion.deferred import DeferredQueryExpression -from sigma.conversion.state import ConversionState -from sigma.processing.pipeline import ProcessingPipeline -from sigma.rule import SigmaRule -from sigma.types import SigmaCompareExpression, SigmaString, SpecialChars - - - -class KustoBackend(TextQueryBackend): - """Microsoft 365 Defender KQL Backend. """ - - # The backend generates grouping if required - name: ClassVar[str] = "Kusto backend" - identifier: ClassVar[str] = "kusto" - formats: Dict[str, str] = { - "default": "Kusto Query Language search strings", - } - - requires_pipeline: bool = False # m365 pipeline is automatically applied - - # Operator precedence - parenthesize = True - precedence: ClassVar[Tuple[ConditionItem, ConditionItem, ConditionItem]] = (ConditionNOT, ConditionAND, ConditionOR) - group_expression: ClassVar[ - str] = "({expr})" # Expression for precedence override grouping as format string with {expr} placeholder - # Generated query tokens - token_separator: str = " " # separator inserted between all boolean operators - or_token: ClassVar[str] = "or" - and_token: ClassVar[str] = "and" - not_token: ClassVar[str] = "not" - eq_token: ClassVar[str] = " =~ " # Token inserted between field and value (without separator) - - # String output - ## Fields - ### Quoting - field_quote: ClassVar[ - str] = "'" # Character used to quote field characters if field_quote_pattern matches (or not, depending on field_quote_pattern_negation). No field name quoting is done if not set. - field_quote_pattern: ClassVar[Pattern] = re.compile( - "^\\w+$") # Quote field names if this pattern (doesn't) matches, depending on field_quote_pattern_negation. Field name is always quoted if pattern is not set. - field_quote_pattern_negation: ClassVar[ - bool] = True # Negate field_quote_pattern result. Field name is quoted if pattern doesn't matches if set to True (default). - - ### Escaping - field_escape: ClassVar[str] = "" # Character to escape particular parts defined in field_escape_pattern. - field_escape_quote: ClassVar[bool] = True # Escape quote string defined in field_quote - field_escape_pattern: ClassVar[Pattern] = re.compile( - "\\s") # All matches of this pattern are prepended with the string contained in field_escape. - - ## Values - str_quote: ClassVar[str] = '"' # string quoting character (added as escaping character) - escape_char: ClassVar[str] = "\\" # Escaping character for special characrers inside string - wildcard_multi: ClassVar[str] = "*" # Character used as multi-character wildcard - wildcard_single: ClassVar[str] = "*" # Character used as single-character wildcard - add_escaped: ClassVar[str] = "\\" # Characters quoted in addition to wildcards and string quote - filter_chars: ClassVar[str] = "" # Characters filtered - bool_values: ClassVar[Dict[bool, str]] = { # Values to which boolean values are mapped. - True: "true", - False: "false", - } - - # String matching operators. if none is appropriate eq_token is used. - startswith_expression: ClassVar[str] = "{field} startswith {value}" - endswith_expression: ClassVar[str] = "{field} endswith {value}" - contains_expression: ClassVar[str] = "{field} contains {value}" - wildcard_match_expression: ClassVar[str] = None # Special expression if wildcards can't be matched with the eq_token operator - - # Regular expressions - re_expression: ClassVar[ - str] = "{field} matches regex \"{regex}\"" # Regular expression query as format string with placeholders {field} and {regex} - re_escape_char: ClassVar[str] = "\\" # Character used for escaping in regular expressions - re_escape: ClassVar[Tuple[str]] = () # List of strings that are escaped - re_escape_escape_char: bool = True # If True, the escape character is also escaped - - # cidr expressions - cidr_wildcard: ClassVar[str] = "*" # Character used as single wildcard - cidr_expression: ClassVar[ - str] = "ipv4_is_in_range({field}, \"{value}\")" # CIDR expression query as format string with placeholders {field} = {value} - cidr_in_list_expression: ClassVar[ - str] = "ipv4_is_in_any_range({field}, \"{value}\")" # CIDR expression query as format string with placeholders {field} = in({list}) - - # Numeric comparison operators - compare_op_expression: ClassVar[ - str] = "{field} {operator} {value}" # Compare operation query as format string with placeholders {field}, {operator} and {value} - # Mapping between CompareOperators elements and strings used as replacement for {operator} in compare_op_expression - compare_operators: ClassVar[Dict[SigmaCompareExpression.CompareOperators, str]] = { - SigmaCompareExpression.CompareOperators.LT: "<", - SigmaCompareExpression.CompareOperators.LTE: "<=", - SigmaCompareExpression.CompareOperators.GT: ">", - SigmaCompareExpression.CompareOperators.GTE: ">=", - } - - # Null/None expressions - field_null_expression: ClassVar[ - str] = "isnull({field})" # Expression for field has null value as format string with {field} placeholder for field name - - # Field value in list, e.g. "field in (value list)" or "field containsall (value list)" - convert_or_as_in: ClassVar[bool] = True # Convert OR as in-expression - convert_and_as_in: ClassVar[bool] = True # Convert AND as in-expression - in_expressions_allow_wildcards: ClassVar[ - bool] = True # Values in list can contain wildcards. If set to False (default) only plain values are converted into in-expressions. - field_in_list_expression: ClassVar[ - str] = "{field} {op} ({list})" # Expression for field in list of values as format string with placeholders {field}, {op} and {list} - or_in_operator: ClassVar[ - str] = "in~" # Operator used to convert OR into in-expressions. Must be set if convert_or_as_in is set - and_in_operator: ClassVar[ - str] = "has_all" # Operator used to convert AND into in-expressions. Must be set if convert_and_as_in is set - list_separator: ClassVar[str] = ", " # List element separator - - # Value not bound to a field - unbound_value_str_expression: ClassVar[ - str] = '{value}' # Expression for string value not bound to a field as format string with placeholder {value} - unbound_value_num_expression: ClassVar[ - str] = '{value}' # Expression for number value not bound to a field as format string with placeholder {value} - unbound_value_re_expression: ClassVar[ - str] = '_=~{value}' # Expression for regular expression not bound to a field as format string with placeholder {value} - - # Query finalization: appending and concatenating deferred query part - deferred_start: ClassVar[str] = "\n| " # String used as separator between main query and deferred parts - deferred_separator: ClassVar[str] = "\n| " # String used to join multiple deferred query parts - deferred_only_query: ClassVar[str] = "*" # String used as query if final query only contains deferred expression - - # We use =~ for eq_token so everything is case insensitive. But this cannot be used with ints/numbers in queries - # So we can define a new token to use for SigmaNumeric types and override convert_condition_field_eq_val_num - # to use it - num_eq_token: ClassVar[str] = " == " - - # Override methods - - # For numeric values, need == instead of =~ - def convert_condition_field_eq_val_num(self, cond: ConditionFieldEqualsValueExpression, state: ConversionState) -> \ - Union[str, DeferredQueryExpression]: - """Conversion of field = number value expressions""" - try: - return self.escape_and_quote_field(cond.field) + self.num_eq_token + str(cond.value) - except TypeError: # pragma: no cover - raise NotImplementedError("Field equals numeric value expressions are not supported by the backend.") - - def convert_condition_as_in_expression(self, - cond: Union[ConditionOR, ConditionAND], - state: ConversionState) -> Union[str, DeferredQueryExpression]: - """Overridden method for conversion of field in value list conditions. - KQL doesn't really use wildcards, so if we have an 'as_in' condition where one or more of the values has a wildcard, - we can still use the as_in condition, then append on the wildcard value(s) with a startswith, endswith, or contains - expression - """ - - field = self.escape_and_quote_field(cond.args[0].field) - op1 = self.or_in_operator if isinstance(cond, ConditionOR) else self.and_in_operator - op2 = self.or_token if isinstance(cond, ConditionOR) else self.and_token - list_nonwildcard = self.list_separator.join([ - self.convert_value_str(arg.value, state) - for arg in cond.args - if (isinstance(arg.value, SigmaString) and not arg.value.contains_special()) or not isinstance(arg.value, - SigmaString) - ]) - list_wildcards = [ - arg.value for arg in cond.args if isinstance(arg.value, SigmaString) and arg.value.contains_special() - ] - as_in_expr = "" - # Convert as_in and wildcard values separately - if list_nonwildcard: - as_in_expr = self.field_in_list_expression.format( - field=field, - op=op1, - list=list_nonwildcard - ) - wildcard_exprs_list = [] - if list_wildcards: - for arg in list_wildcards: - new_cond = ConditionFieldEqualsValueExpression(field=field, value=arg) - if arg[1:-1].contains_special(): # Wildcard in string, not at start or end. - # We need to get rid of all wildcards, and create a 'and contains' for each element in the list - expr = f'{self.token_separator}{self.and_token}{self.token_separator}'.join( - [self.contains_expression.format( - field=field, - value=self.convert_value_str( - SigmaString(x), state)) for x in arg.s if not isinstance(x, SpecialChars) - ] - ) - expr = self.group_expression.format(expr=expr) - else: - expr = self.convert_condition_field_eq_val_str(new_cond, state) - wildcard_exprs_list.append(expr) - wildcard_exprs = f'{self.token_separator}{op2}{self.token_separator}'.join(wildcard_exprs_list) - if as_in_expr and wildcard_exprs: - return as_in_expr + self.token_separator + op2 + self.token_separator + wildcard_exprs - return as_in_expr + wildcard_exprs - - def convert_condition_not(self, cond: ConditionNOT, state: ConversionState) -> Union[str, DeferredQueryExpression]: - """Conversion of NOT conditions. Overridden to surround the group or expr of the 'not' negation with parens, - as expected by KQL. - """ - arg = cond.args[0] - try: - if arg.__class__ in self.precedence: # group if AND or OR condition is negated - return self.not_token + "(" + self.convert_condition_group(arg, state) + ")" - else: - expr = self.convert_condition(arg, state) - if isinstance(expr, DeferredQueryExpression): # negate deferred expression and pass it to parent - return expr.negate() - else: # convert negated expression to string - return self.not_token + "(" + expr + ")" - except TypeError: # pragma: no cover - raise NotImplementedError("Operator 'not' not supported by the backend") - - def convert_value_str(self, s: SigmaString, state: ConversionState) -> str: - """Convert a SigmaString into a plain string which can be used in query.""" - converted = super().convert_value_str(s, state) - # If we have a wildcard in a string, we need to un-escape it - # See issue #13 - return re.sub(r"\\\*", r"*", converted) diff --git a/sigma/backends/microsoft365defender/kusto.py b/sigma/backends/microsoft365defender/kusto.py deleted file mode 100644 index c1cd066..0000000 --- a/sigma/backends/microsoft365defender/kusto.py +++ /dev/null @@ -1,216 +0,0 @@ -import re -from typing import ClassVar, Dict, Tuple, Pattern, Any, Union, Optional - -from sigma.conditions import ConditionItem, ConditionAND, ConditionOR, ConditionNOT, ConditionFieldEqualsValueExpression -from sigma.conversion.base import TextQueryBackend -from sigma.conversion.deferred import DeferredQueryExpression -from sigma.conversion.state import ConversionState -from sigma.processing.pipeline import ProcessingPipeline -from sigma.rule import SigmaRule -from sigma.types import SigmaCompareExpression, SigmaString, SpecialChars - - - -class KustoBackend(TextQueryBackend): - """Kusto Query LanguageBackend. """ - - # The backend generates grouping if required - name: ClassVar[str] = "Kusto backend" - identifier: ClassVar[str] = "kusto" - formats: Dict[str, str] = { - "default": "Kusto Query Language search strings", - } - - requires_pipeline: bool = False # m365 pipeline is automatically applied - - # Operator precedence - parenthesize = True - precedence: ClassVar[Tuple[ConditionItem, ConditionItem, ConditionItem]] = (ConditionNOT, ConditionAND, ConditionOR) - group_expression: ClassVar[ - str] = "({expr})" # Expression for precedence override grouping as format string with {expr} placeholder - # Generated query tokens - token_separator: str = " " # separator inserted between all boolean operators - or_token: ClassVar[str] = "or" - and_token: ClassVar[str] = "and" - not_token: ClassVar[str] = "not" - eq_token: ClassVar[str] = " =~ " # Token inserted between field and value (without separator) - - # String output - ## Fields - ### Quoting - field_quote: ClassVar[ - str] = "'" # Character used to quote field characters if field_quote_pattern matches (or not, depending on field_quote_pattern_negation). No field name quoting is done if not set. - field_quote_pattern: ClassVar[Pattern] = re.compile( - "^\\w+$") # Quote field names if this pattern (doesn't) matches, depending on field_quote_pattern_negation. Field name is always quoted if pattern is not set. - field_quote_pattern_negation: ClassVar[ - bool] = True # Negate field_quote_pattern result. Field name is quoted if pattern doesn't matches if set to True (default). - - ### Escaping - field_escape: ClassVar[str] = "" # Character to escape particular parts defined in field_escape_pattern. - field_escape_quote: ClassVar[bool] = True # Escape quote string defined in field_quote - field_escape_pattern: ClassVar[Pattern] = re.compile( - "\\s") # All matches of this pattern are prepended with the string contained in field_escape. - - ## Values - str_quote: ClassVar[str] = '"' # string quoting character (added as escaping character) - escape_char: ClassVar[str] = "\\" # Escaping character for special characrers inside string - wildcard_multi: ClassVar[str] = "*" # Character used as multi-character wildcard - wildcard_single: ClassVar[str] = "*" # Character used as single-character wildcard - add_escaped: ClassVar[str] = "\\" # Characters quoted in addition to wildcards and string quote - filter_chars: ClassVar[str] = "" # Characters filtered - bool_values: ClassVar[Dict[bool, str]] = { # Values to which boolean values are mapped. - True: "true", - False: "false", - } - - # String matching operators. if none is appropriate eq_token is used. - startswith_expression: ClassVar[str] = "{field} startswith {value}" - endswith_expression: ClassVar[str] = "{field} endswith {value}" - contains_expression: ClassVar[str] = "{field} contains {value}" - wildcard_match_expression: ClassVar[str] = None # Special expression if wildcards can't be matched with the eq_token operator - - # Regular expressions - re_expression: ClassVar[ - str] = "{field} matches regex \"{regex}\"" # Regular expression query as format string with placeholders {field} and {regex} - re_escape_char: ClassVar[str] = "\\" # Character used for escaping in regular expressions - re_escape: ClassVar[Tuple[str]] = () # List of strings that are escaped - re_escape_escape_char: bool = True # If True, the escape character is also escaped - - # cidr expressions - cidr_wildcard: ClassVar[str] = "*" # Character used as single wildcard - cidr_expression: ClassVar[ - str] = "ipv4_is_in_range({field}, \"{value}\")" # CIDR expression query as format string with placeholders {field} = {value} - cidr_in_list_expression: ClassVar[ - str] = "ipv4_is_in_any_range({field}, \"{value}\")" # CIDR expression query as format string with placeholders {field} = in({list}) - - # Numeric comparison operators - compare_op_expression: ClassVar[ - str] = "{field} {operator} {value}" # Compare operation query as format string with placeholders {field}, {operator} and {value} - # Mapping between CompareOperators elements and strings used as replacement for {operator} in compare_op_expression - compare_operators: ClassVar[Dict[SigmaCompareExpression.CompareOperators, str]] = { - SigmaCompareExpression.CompareOperators.LT: "<", - SigmaCompareExpression.CompareOperators.LTE: "<=", - SigmaCompareExpression.CompareOperators.GT: ">", - SigmaCompareExpression.CompareOperators.GTE: ">=", - } - - # Null/None expressions - field_null_expression: ClassVar[ - str] = "isnull({field})" # Expression for field has null value as format string with {field} placeholder for field name - - # Field value in list, e.g. "field in (value list)" or "field containsall (value list)" - convert_or_as_in: ClassVar[bool] = True # Convert OR as in-expression - convert_and_as_in: ClassVar[bool] = True # Convert AND as in-expression - in_expressions_allow_wildcards: ClassVar[ - bool] = True # Values in list can contain wildcards. If set to False (default) only plain values are converted into in-expressions. - field_in_list_expression: ClassVar[ - str] = "{field} {op} ({list})" # Expression for field in list of values as format string with placeholders {field}, {op} and {list} - or_in_operator: ClassVar[ - str] = "in~" # Operator used to convert OR into in-expressions. Must be set if convert_or_as_in is set - and_in_operator: ClassVar[ - str] = "has_all" # Operator used to convert AND into in-expressions. Must be set if convert_and_as_in is set - list_separator: ClassVar[str] = ", " # List element separator - - # Value not bound to a field - unbound_value_str_expression: ClassVar[ - str] = '{value}' # Expression for string value not bound to a field as format string with placeholder {value} - unbound_value_num_expression: ClassVar[ - str] = '{value}' # Expression for number value not bound to a field as format string with placeholder {value} - unbound_value_re_expression: ClassVar[ - str] = '_=~{value}' # Expression for regular expression not bound to a field as format string with placeholder {value} - - # Query finalization: appending and concatenating deferred query part - deferred_start: ClassVar[str] = "\n| " # String used as separator between main query and deferred parts - deferred_separator: ClassVar[str] = "\n| " # String used to join multiple deferred query parts - deferred_only_query: ClassVar[str] = "*" # String used as query if final query only contains deferred expression - - # We use =~ for eq_token so everything is case insensitive. But this cannot be used with ints/numbers in queries - # So we can define a new token to use for SigmaNumeric types and override convert_condition_field_eq_val_num - # to use it - num_eq_token: ClassVar[str] = " == " - - # Override methods - - # For numeric values, need == instead of =~ - def convert_condition_field_eq_val_num(self, cond: ConditionFieldEqualsValueExpression, state: ConversionState) -> \ - Union[str, DeferredQueryExpression]: - """Conversion of field = number value expressions""" - try: - return self.escape_and_quote_field(cond.field) + self.num_eq_token + str(cond.value) - except TypeError: # pragma: no cover - raise NotImplementedError("Field equals numeric value expressions are not supported by the backend.") - - def convert_condition_as_in_expression(self, - cond: Union[ConditionOR, ConditionAND], - state: ConversionState) -> Union[str, DeferredQueryExpression]: - """Overridden method for conversion of field in value list conditions. - KQL doesn't really use wildcards, so if we have an 'as_in' condition where one or more of the values has a wildcard, - we can still use the as_in condition, then append on the wildcard value(s) with a startswith, endswith, or contains - expression - """ - - field = self.escape_and_quote_field(cond.args[0].field) - op1 = self.or_in_operator if isinstance(cond, ConditionOR) else self.and_in_operator - op2 = self.or_token if isinstance(cond, ConditionOR) else self.and_token - list_nonwildcard = self.list_separator.join([ - self.convert_value_str(arg.value, state) - for arg in cond.args - if (isinstance(arg.value, SigmaString) and not arg.value.contains_special()) or not isinstance(arg.value, - SigmaString) - ]) - list_wildcards = [ - arg.value for arg in cond.args if isinstance(arg.value, SigmaString) and arg.value.contains_special() - ] - as_in_expr = "" - # Convert as_in and wildcard values separately - if list_nonwildcard: - as_in_expr = self.field_in_list_expression.format( - field=field, - op=op1, - list=list_nonwildcard - ) - wildcard_exprs_list = [] - if list_wildcards: - for arg in list_wildcards: - new_cond = ConditionFieldEqualsValueExpression(field=field, value=arg) - if arg[1:-1].contains_special(): # Wildcard in string, not at start or end. - # We need to get rid of all wildcards, and create a 'and contains' for each element in the list - expr = f'{self.token_separator}{self.and_token}{self.token_separator}'.join( - [self.contains_expression.format( - field=field, - value=self.convert_value_str( - SigmaString(x), state)) for x in arg.s if not isinstance(x, SpecialChars) - ] - ) - expr = self.group_expression.format(expr=expr) - else: - expr = self.convert_condition_field_eq_val_str(new_cond, state) - wildcard_exprs_list.append(expr) - wildcard_exprs = f'{self.token_separator}{op2}{self.token_separator}'.join(wildcard_exprs_list) - if as_in_expr and wildcard_exprs: - return as_in_expr + self.token_separator + op2 + self.token_separator + wildcard_exprs - return as_in_expr + wildcard_exprs - - def convert_condition_not(self, cond: ConditionNOT, state: ConversionState) -> Union[str, DeferredQueryExpression]: - """Conversion of NOT conditions. Overridden to surround the group or expr of the 'not' negation with parens, - as expected by KQL. - """ - arg = cond.args[0] - try: - if arg.__class__ in self.precedence: # group if AND or OR condition is negated - return self.not_token + "(" + self.convert_condition_group(arg, state) + ")" - else: - expr = self.convert_condition(arg, state) - if isinstance(expr, DeferredQueryExpression): # negate deferred expression and pass it to parent - return expr.negate() - else: # convert negated expression to string - return self.not_token + "(" + expr + ")" - except TypeError: # pragma: no cover - raise NotImplementedError("Operator 'not' not supported by the backend") - - def convert_value_str(self, s: SigmaString, state: ConversionState) -> str: - """Convert a SigmaString into a plain string which can be used in query.""" - converted = super().convert_value_str(s, state) - # If we have a wildcard in a string, we need to un-escape it - # See issue #13 - return re.sub(r"\\\*", r"*", converted) diff --git a/sigma/pipelines/microsoft365defender/__init__.py b/sigma/pipelines/microsoft365defender/__init__.py deleted file mode 100644 index 7368848..0000000 --- a/sigma/pipelines/microsoft365defender/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .microsoft365defender import microsoft_365_defender_pipeline -pipelines = { - "microsoft_365_defender_pipeline": microsoft_365_defender_pipeline, # TODO: adapt identifier to something approproiate -} \ No newline at end of file diff --git a/sigma/pipelines/microsoft365defender/finalization.py b/sigma/pipelines/microsoft365defender/finalization.py deleted file mode 100644 index 1fbae38..0000000 --- a/sigma/pipelines/microsoft365defender/finalization.py +++ /dev/null @@ -1,32 +0,0 @@ -from dataclasses import dataclass, field -from typing import Union, List - -from sigma.processing.finalization import Finalizer - - -@dataclass -class Microsoft365DefenderTableFinalizer(Finalizer): - """Finalizer for Microsoft 365 Defender Backend to add in the table name as a prefix to the query. - - The standard finalizers append all queries together into a single query string. However, this finalizer - will keep individual queries separate and add the table name as a prefix to each query, per ordering in the - pipeline's state 'query_table' key which is appended to for each rule by set for each rule by the - SetQueryTableStateTransformation transformation. - - A custom table name can be specified in the finalizer, otherwise the table name will be selected based on the category of the rule. - """ - - table_names: Union[str, List[str]] = field(default_factory=list) - - def apply(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[str]) -> List[str]: - if isinstance(self.table_names, str): - self.table_names = [self.table_names] * len(queries) - - for i, query in enumerate(queries): - if self.table_names: - queries[i] = f"{self.table_names[i]}\n| where {query}" - elif 'query_table' in pipeline.state: - queries[i] = f"{pipeline.state['query_table'][i]}\n| where {query}" - else: - queries[i] = f"search {query}" - return queries diff --git a/sigma/pipelines/microsoft365defender/microsoft365defender.py b/sigma/pipelines/microsoft365defender/microsoft365defender.py deleted file mode 100644 index ed5e2eb..0000000 --- a/sigma/pipelines/microsoft365defender/microsoft365defender.py +++ /dev/null @@ -1,606 +0,0 @@ -from typing import Union, Optional, Iterable -from collections import defaultdict - -from sigma.exceptions import SigmaTransformationError -from sigma.pipelines.common import (logsource_windows_process_creation, logsource_windows_image_load, - logsource_windows_file_event, logsource_windows_file_delete, - logsource_windows_file_change, logsource_windows_file_access, - logsource_windows_file_rename, logsource_windows_registry_set, - logsource_windows_registry_add, logsource_windows_registry_delete, - logsource_windows_registry_event, logsource_windows_network_connection) -from sigma.processing.transformations import (FieldMappingTransformation, RuleFailureTransformation, - ReplaceStringTransformation, SetStateTransformation, - DetectionItemTransformation, ValueTransformation, - DetectionItemFailureTransformation, DropDetectionItemTransformation) -from sigma.processing.conditions import (IncludeFieldCondition, ExcludeFieldCondition, - DetectionItemProcessingItemAppliedCondition, LogsourceCondition) -from sigma.conditions import ConditionOR -from sigma.types import SigmaString, SigmaType -from sigma.processing.pipeline import ProcessingItem, ProcessingPipeline -from sigma.rule import SigmaDetectionItem, SigmaDetection -from .finalization import Microsoft365DefenderTableFinalizer -from .transformations import SetQueryTableStateTransformation - - -# CUSTOM TRANSFORMATIONS -## Custom DetectionItemTransformation to split domain and user, if applicable -class SplitDomainUserTransformation(DetectionItemTransformation): - """Custom DetectionItemTransformation transformation to split a User field into separate domain and user fields, - if applicable. This is to handle the case where the Sysmon `User` field may contain a domain AND username, and - Advanced Hunting queries separate out the domain and username into separate fields. - If a matching field_name_condition field uses the schema DOMAIN\\USER, a new SigmaDetectionItem - will be made for the Domain and put inside a SigmaDetection with the original User SigmaDetectionItem - (minus the domain) for the matching SigmaDetectionItem. - - You should use this with a field_name_condition for `IncludeFieldName(['field', 'names', 'for', 'username']`)""" - - def apply_detection_item(self, detection_item: SigmaDetectionItem) -> Optional[ - Union[SigmaDetection, SigmaDetectionItem]]: - to_return = [] - if not isinstance(detection_item.value, list): # Ensure its a list, but it most likely will be - detection_item.value = list(detection_item.value) - for d in detection_item.value: - username = d.to_plain().split("\\") - username_field_mappings = { - 'AccountName': 'AccountDomain', - 'RequestAccountName': 'RequestAccountDomain', - 'InitiatingProcessAccountName': 'InitiatingProcessAccountDomain', - } - if len(username) == 2: - domain = username[0] - username = [SigmaString(username[1])] - - domain_field = username_field_mappings.get(detection_item.field, "InitiatingProcessAccountDomain") - domain_value = [SigmaString(domain)] - user_detection_item = SigmaDetectionItem(field=detection_item.field, - modifiers=[], - value=username, - ) - domain_detection_item = SigmaDetectionItem(field=domain_field, - modifiers=[], - value=domain_value) - to_return.append(SigmaDetection(detection_items=[user_detection_item, domain_detection_item])) - else: - - to_return.append(SigmaDetection([SigmaDetectionItem(field=detection_item.field, - modifiers=detection_item.modifiers, - value=username)])) - return SigmaDetection(to_return) - - -## Custom DetectionItemTransformation to regex hash algos/values in Hashes field, if applicable -class HashesValuesTransformation(DetectionItemTransformation): - """Custom DetectionItemTransformation to take a list of values in the 'Hashes' field, which are expected to be - 'algo:hash_value', and create new SigmaDetectionItems for each hash type, where the values is a list of - SigmaString hashes. If the hash type is not part of the value, it will be inferred based on length. - - Use with field_name_condition for Hashes field""" - - def apply_detection_item(self, detection_item: SigmaDetectionItem) -> Optional[ - Union[SigmaDetection, SigmaDetectionItem]]: - to_return = [] - no_valid_hash_algo = True - algo_dict = defaultdict(list) # map to keep track of algos and lists of values - if not isinstance(detection_item.value, list): - detection_item.value = [detection_item.value] - for d in detection_item.value: - hash_value = d.to_plain().split("|") # sometimes if ALGO|VALUE - if len(hash_value) == 1: # and sometimes its ALGO=VALUE - hash_value = hash_value[0].split("=") - if len(hash_value) == 2: - hash_algo = hash_value[0].lstrip("*").upper() if hash_value[0].lstrip("*").upper() in ['MD5', 'SHA1', 'SHA256'] else "" - if hash_algo: - no_valid_hash_algo = False - hash_value = hash_value[1] - else: - hash_value = hash_value[0] - if len(hash_value) == 32: # MD5 - hash_algo = 'MD5' - no_valid_hash_algo = False - elif len(hash_value) == 40: # SHA1 - hash_algo = 'SHA1' - no_valid_hash_algo = False - elif len(hash_value) == 64: # SHA256 - hash_algo = "SHA256" - no_valid_hash_algo = False - else: # Invalid algo, no fieldname for keyword search - hash_algo = '' - algo_dict[hash_algo].append(hash_value) - if no_valid_hash_algo: - raise InvalidHashAlgorithmError( - "No valid hash algo found in Hashes field. Advanced Hunting Queries do not support the " - "IMPHASH field. Ensure the detection item has at least one MD5, SHA1, or SHA265 hash field/value" - ) - for k, v in algo_dict.items(): - if k: # Filter out invalid hash algo types - to_return.append(SigmaDetectionItem(field=k if k != 'keyword' else None, - modifiers=[], - value=[SigmaString(x) for x in v])) - return SigmaDetection(detection_items=to_return, item_linking=ConditionOR) - - -## Change ActionType value AFTER field transformations from Sysmon values to DeviceRegistryEvents values -class RegistryActionTypeValueTransformation(ValueTransformation): - """Custom ValueTransformation transformation. The Microsoft DeviceRegistryEvents table expect the ActionType to - be a slightly different set of values than what Sysmon specified, so this will change them to the correct value.""" - value_mappings = { # Sysmon EventType -> DeviceRegistyEvents ActionType - 'CreateKey': 'RegistryKeyCreated', - 'DeleteKey': ['RegistryKeyDeleted', 'RegistryValueDeleted'], - 'SetValue': 'RegistryValueSet', - 'RenameKey': ['RegistryValueSet', 'RegistryKeyCreated'], - } - - def apply_value(self, field: str, val: SigmaType) -> Optional[Union[SigmaType, Iterable[SigmaType]]]: - mapped_vals = self.value_mappings.get(val.to_plain(), val.to_plain()) - if isinstance(mapped_vals, list): - return [SigmaString(v) for v in mapped_vals] - return SigmaString(mapped_vals) - - -# Extract parent process name from ParentImage after applying ParentImage field mapping -class ParentImageValueTransformation(ValueTransformation): - """Custom ValueTransformation transformation. Unfortunately, none of the table schemas have - InitiatingProcessParentFolderPath like they do InitiatingProcessFolderPath. Due to this, we cannot directly map the - Sysmon `ParentImage` field to a table field. However, InitiatingProcessParentFileName is an available field in - nearly all tables, so we will extract the process name and use that instead. - - Use this transformation BEFORE mapping ParentImage to InitiatingProcessFileName - """ - - def apply_value(self, field: str, val: SigmaType) -> Optional[Union[SigmaType, Iterable[SigmaType]]]: - parent_process_name = str(val.to_plain().split("\\")[-1].split("/")[-1]) - return SigmaString(parent_process_name) - - -class InvalidFieldTransformation(DetectionItemFailureTransformation): - """ - Overrides the apply_detection_item() method from DetectionItemFailureTransformation to also include the field name - in the error message - """ - - def apply_detection_item(self, detection_item: SigmaDetectionItem) -> None: - field_name = detection_item.field - self.message = f"Invalid SigmaDetectionItem field name encountered: {field_name}. " + self.message - raise SigmaTransformationError(self.message) - - -class InvalidHashAlgorithmError(Exception): - pass - - - -# FIELD MAPPINGS -## Field mappings from Sysmon (where applicable) fields to Advanced Hunting Query fields based on schema in tables -## See: https://learn.microsoft.com/en-us/microsoft-365/security/defender/advanced-hunting-schema-tables?view=o365-worldwide#learn-the-schema-tables -query_table_field_mappings = { - 'DeviceProcessEvents': { # process_creation, Sysmon EventID 1 -> DeviceProcessEvents table - # ProcessGuid: ?, - 'ProcessId': 'ProcessId', - 'Image': 'FolderPath', - 'FileVersion': 'ProcessVersionInfoProductVersion', - 'Description': 'ProcessVersionInfoFileDescription', - 'Product': 'ProcessVersionInfoProductName', - 'Company': 'ProcessVersionInfoCompanyName', - 'OriginalFileName': 'ProcessVersionInfoOriginalFileName', - 'CommandLine': 'ProcessCommandLine', - # CurrentDirectory: ? - 'User': 'AccountName', - # LogonGuid: ? - 'LogonId': 'LogonId', - # TerminalSessionId: ? - 'IntegrityLevel': 'ProcessIntegrityLevel', - 'sha1': 'SHA1', - 'sha256': 'SHA256', - 'md5': 'MD5', - # 'ParentProcessGuid': ?, - 'ParentProcessId': 'InitiatingProcessId', - 'ParentImage': 'InitiatingProcessFolderPath', - 'ParentCommandLine': 'InitiatingProcessCommandLine', - 'ParentUser': 'InitiatingProcessAccountName', - }, - 'DeviceImageLoadEvents': { - # 'ProcessGuid': ?, - 'ProcessId': 'InitiatingProcessId', - 'Image': 'InitiatingProcessFolderPath', # File path of the process that loaded the image - 'ImageLoaded': 'FolderPath', - 'FileVersion': 'InitiatingProcessVersionInfoProductVersion', - 'Description': 'InitiatingProcessVersionInfoFileDescription', - 'Product': 'InitiatingProcessVersionInfoProductName', - 'Company': 'InitiatingProcessVersionInfoCompanyName', - 'OriginalFileName': 'InitiatingProcessVersionInfoOriginalFileName', - # 'Hashes': ?, - 'sha1': 'SHA1', - 'sha256': 'SHA256', - 'md5': 'MD5', - # 'Signed': ? - # 'Signature': ? - # 'SignatureStatus': ? - 'User': 'InitiatingProcessAccountName' - }, - 'DeviceFileEvents': { # file_*, Sysmon EventID 11 (create), 23 (delete) -> DeviceFileEvents table - # 'ProcessGuid': ?, - 'ProcessId': 'InitiatingProcessId', - 'Image': 'InitiatingProcessFolderPath', - 'TargetFilename': 'FolderPath', - # 'CreationUtcTime': 'Timestamp', - 'User': 'RequestAccountName', - # 'Hashes': ?, - 'sha1': 'SHA1', - 'sha256': 'SHA256', - 'md5': 'MD5', - }, - 'DeviceNetworkEvents': { # network_connection, Sysmon EventID 3 -> DeviceNetworkEvents table - # 'ProcessGuid': ?, - 'ProcessId': 'InitiatingProcessId', - 'Image': 'InitiatingProcessFolderPath', - 'User': 'InitiatingProcessAccountName', - 'Protocol': 'Protocol', - # 'Initiated': ?, - # 'SourceIsIpv6': ?, - 'SourceIp': 'LocalIP', - 'SourceHostname': 'DeviceName', - 'SourcePort': 'LocalPort', - # 'SourcePortName': ?, - # 'DestinationIsIpv6': ?, - 'DestinationIp': 'RemoteIP', - 'DestinationHostname': 'RemoteUrl', - 'DestinationPort': 'RemotePort', - # 'DestinationPortName': ?, - }, - "DeviceRegistryEvents": { - # registry_*, Sysmon EventID 12 (create/delete), 13 (value set), 14 (key/value rename) -> DeviceRegistryEvents table, - 'EventType': 'ActionType', - # 'ProcessGuid': ?, - 'ProcessId': 'InitiatingProcessId', - 'Image': 'InitiatingProcessFolderPath', - 'TargetObject': 'RegistryKey', - # 'NewName': ? - 'Details': 'RegistryValueData', - 'User': 'InitiatingProcessAccountName' - } -} - -## Generic catch-all field mappings for sysmon -> microsoft 365 defender fields that appear in most tables and -## haven't been mapped already -generic_field_mappings = { - 'EventType': 'ActionType', - 'User': 'InitiatingProcessAccountName', - 'CommandLine': 'InitiatingProcessCommandLine', - 'Image': 'InitiatingProcessFolderPath', - 'SourceImage': 'InitiatingProcessFolderPath', - 'ProcessId': 'InitiatingProcessId', - 'md5': 'InitiatingProcessMD5', - 'sha1': 'InitiatingProcessSHA1', - 'sha256': 'InitiatingProcessSHA256', - 'ParentProcessId': 'InitiatingProcessParentId', - 'ParentCommandLine': 'InitiatingProcessParentCommandLine', - 'Company': 'InitiatingProcessVersionInfoCompanyName', - 'Description': 'InitiatingProcessVersionInfoFileDescription', - 'OriginalFileName': 'InitiatingProcessVersionInfoOriginalFileName', - 'Product': 'InitiatingProcessVersionInfoProductName' -} - -# VALID FIELDS PER QUERY TABLE -## Will Implement field checking later once issue with removing fields is figured out, for now it fails the pipeline -## dict of {'table_name': [list, of, valid_fields]} for each table -valid_fields_per_table = { - 'DeviceProcessEvents': ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'FileName', 'FolderPath', 'SHA1', - 'SHA256', 'MD5', 'FileSize', 'ProcessVersionInfoCompanyName', - 'ProcessVersionInfoProductName', 'ProcessVersionInfoProductVersion', - 'ProcessVersionInfoInternalFileName', 'ProcessVersionInfoOriginalFileName', - 'ProcessVersionInfoFileDescription', 'ProcessId', 'ProcessCommandLine', - 'ProcessIntegrityLevel', 'ProcessTokenElevation', 'ProcessCreationTime', 'AccountDomain', - 'AccountName', 'AccountSid', 'AccountUpn', 'AccountObjectId', 'LogonId', - 'InitiatingProcessAccountDomain', 'InitiatingProcessAccountName', - 'InitiatingProcessAccountSid', 'InitiatingProcessAccountUpn', - 'InitiatingProcessAccountObjectId', 'InitiatingProcessLogonId', - 'InitiatingProcessIntegrityLevel', 'InitiatingProcessTokenElevation', - 'InitiatingProcessSHA1', 'InitiatingProcessSHA256', 'InitiatingProcessMD5', - 'InitiatingProcessFileName', 'InitiatingProcessFileSize', - 'InitiatingProcessVersionInfoCompanyName', 'InitiatingProcessVersionInfoProductName', - 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', - 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessFolderPath', 'InitiatingProcessParentId', - 'InitiatingProcessParentFileName', 'InitiatingProcessParentCreationTime', - 'InitiatingProcessSignerType', 'InitiatingProcessSignatureStatus', 'ReportId', - 'AppGuardContainerId', 'AdditionalFields'], - 'DeviceImageLoadEvents': ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'FileName', 'FolderPath', 'SHA1', - 'SHA256', 'MD5', 'FileSize', 'InitiatingProcessAccountDomain', - 'InitiatingProcessAccountName', 'InitiatingProcessAccountSid', - 'InitiatingProcessAccountUpn', 'InitiatingProcessAccountObjectId', - 'InitiatingProcessIntegrityLevel', 'InitiatingProcessTokenElevation', - 'InitiatingProcessSHA1', 'InitiatingProcessSHA256', 'InitiatingProcessMD5', - 'InitiatingProcessFileName', 'InitiatingProcessFileSize', - 'InitiatingProcessVersionInfoCompanyName', 'InitiatingProcessVersionInfoProductName', - 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', - 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessFolderPath', 'InitiatingProcessParentId', - 'InitiatingProcessParentFileName', 'InitiatingProcessParentCreationTime', 'ReportId', - 'AppGuardContainerId'], - 'DeviceFileEvents': ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'FileName', 'FolderPath', 'SHA1', - 'SHA256', 'MD5', 'FileOriginUrl', 'FileOriginReferrerUrl', 'FileOriginIP', - 'PreviousFolderPath', 'PreviousFileName', 'FileSize', 'InitiatingProcessAccountDomain', - 'InitiatingProcessAccountName', 'InitiatingProcessAccountSid', 'InitiatingProcessAccountUpn', - 'InitiatingProcessAccountObjectId', 'InitiatingProcessMD5', 'InitiatingProcessSHA1', - 'InitiatingProcessSHA256', 'InitiatingProcessFolderPath', 'InitiatingProcessFileName', - 'InitiatingProcessFileSize', 'InitiatingProcessVersionInfoCompanyName', - 'InitiatingProcessVersionInfoProductName', 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessIntegrityLevel', 'InitiatingProcessTokenElevation', - 'InitiatingProcessParentId', 'InitiatingProcessParentFileName', - 'InitiatingProcessParentCreationTime', 'RequestProtocol', 'RequestSourceIP', - 'RequestSourcePort', 'RequestAccountName', 'RequestAccountDomain', 'RequestAccountSid', - 'ShareName', 'InitiatingProcessFileSize', 'SensitivityLabel', 'SensitivitySubLabel', - 'IsAzureInfoProtectionApplied', 'ReportId', 'AppGuardContainerId', 'AdditionalFields'], - 'DeviceRegistryEvents': ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'RegistryKey', 'RegistryValueType', - 'RegistryValueName', 'RegistryValueData', 'PreviousRegistryKey', - 'PreviousRegistryValueName', 'PreviousRegistryValueData', 'InitiatingProcessAccountDomain', - 'InitiatingProcessAccountName', 'InitiatingProcessAccountSid', - 'InitiatingProcessAccountUpn', 'InitiatingProcessAccountObjectId', 'InitiatingProcessSHA1', - 'InitiatingProcessSHA256', 'InitiatingProcessMD5', 'InitiatingProcessFileName', - 'InitiatingProcessFileSize', 'InitiatingProcessVersionInfoCompanyName', - 'InitiatingProcessVersionInfoProductName', 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', - 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessFolderPath', 'InitiatingProcessParentId', - 'InitiatingProcessParentFileName', 'InitiatingProcessParentCreationTime', - 'InitiatingProcessIntegrityLevel', 'InitiatingProcessTokenElevation', 'ReportId', - 'AppGuardContainerId'], - 'DeviceNetworkEvents': ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'RemoteIP', 'RemotePort', 'RemoteUrl', - 'LocalIP', 'LocalPort', 'Protocol', 'LocalIPType', 'RemoteIPType', 'InitiatingProcessSHA1', - 'InitiatingProcessSHA256', 'InitiatingProcessMD5', 'InitiatingProcessFileName', - 'InitiatingProcessFileSize', 'InitiatingProcessVersionInfoCompanyName', - 'InitiatingProcessVersionInfoProductName', 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', - 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessFolderPath', 'InitiatingProcessParentFileName', - 'InitiatingProcessParentId', 'InitiatingProcessParentCreationTime', - 'InitiatingProcessAccountDomain', 'InitiatingProcessAccountName', - 'InitiatingProcessAccountSid', 'InitiatingProcessAccountUpn', - 'InitiatingProcessAccountObjectId', 'InitiatingProcessIntegrityLevel', - 'InitiatingProcessTokenElevation', 'ReportId', 'AppGuardContainerId', 'AdditionalFields']} - -# Mapping from ParentImage to InitiatingProcessParentFileName. Must be used alongside of ParentImageValueTransformation -parent_image_field_mapping = {'ParentImage': 'InitiatingProcessParentFileName'} - -# OTHER MAPPINGS -## useful for creating ProcessingItems() with list comprehension - -## Query Table names -> rule categories -table_to_category_mappings = { - 'DeviceProcessEvents': ['process_creation'], - 'DeviceImageLoadEvents': ['image_load'], - 'DeviceFileEvents': ['file_access', 'file_change', 'file_delete', 'file_event', 'file_rename'], - 'DeviceRegistryEvents': ['registry_add', 'registry_delete', 'registry_event', 'registry_set'], - 'DeviceNetworkEvents': ['network_connection'] -} - -## rule categories -> RuleConditions -category_to_conditions_mappings = { - 'process_creation': logsource_windows_process_creation(), - 'image_load': logsource_windows_image_load(), - 'file_access': logsource_windows_file_access(), - 'file_change': logsource_windows_file_change(), - 'file_delete': logsource_windows_file_delete(), - 'file_event': logsource_windows_file_event(), - 'file_rename': logsource_windows_file_rename(), - 'registry_add': logsource_windows_registry_add(), - 'registry_delete': logsource_windows_registry_delete(), - 'registry_event': logsource_windows_registry_event(), - 'registry_set': logsource_windows_registry_set(), - 'network_connection': logsource_windows_network_connection() -} - -# PROCESSING_ITEMS() -## ProcessingItems to set state key 'query_table' to use in backend -## i.e. $QueryTable$ | $rest_of_query$ -query_table_proc_items = [ - ProcessingItem( - identifier=f"microsoft_365_defender_set_query_table_{table_name}", - transformation=SetQueryTableStateTransformation(table_name), - rule_conditions=[ - category_to_conditions_mappings[rule_category] for rule_category in rule_categories - ], - rule_condition_linking=any, - ) - for table_name, rule_categories in table_to_category_mappings.items() -] - -## Fieldmappings -fieldmappings_proc_items = [ - ProcessingItem( - identifier=f"microsoft_365_defender_fieldmappings_{table_name}", - transformation=FieldMappingTransformation(query_table_field_mappings[table_name]), - rule_conditions=[ - category_to_conditions_mappings[rule_category] for rule_category in rule_categories - ], - rule_condition_linking=any, - ) - for table_name, rule_categories in table_to_category_mappings.items() -] - -## Generic Fielp Mappings, keep this last -## Exclude any fields already mapped. For example, if process_creation events ProcessId has already -## been mapped to the same field name (ProcessId), we don't to remap it to InitiatingProcessId -generic_field_mappings_proc_item = [ProcessingItem( - identifier="microsoft_365_defender_fieldmappings_generic", - transformation=FieldMappingTransformation( - generic_field_mappings - ), - detection_item_conditions=[ - DetectionItemProcessingItemAppliedCondition(f"microsoft_365_defender_fieldmappings_{table_name}") - for table_name in table_to_category_mappings.keys() - ], - detection_item_condition_linking=any, - detection_item_condition_negation=True, -) -] - -## Field Value Replacements ProcessingItems -replacement_proc_items = [ - # Sysmon uses abbreviations in RegistryKey values, replace with full key names as the DeviceRegistryEvents schema - # expects them to be - # Note: Ensure this comes AFTER field mapping renames, as we're specifying DeviceRegistryEvent fields - # - # Do this one first, or else the HKLM only one will replace HKLM and mess up the regex - ProcessingItem( - identifier="microsoft_365_defender_registry_key_replace_currentcontrolset", - transformation=ReplaceStringTransformation(regex=r"(?i)(^HKLM\\SYSTEM\\CurrentControlSet)", - replacement=r"HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet001"), - field_name_conditions=[IncludeFieldCondition(['RegistryKey', 'PreviousRegistryKey'])] - ), - ProcessingItem( - identifier="microsoft_365_defender_registry_key_replace_hklm", - transformation=ReplaceStringTransformation(regex=r"(?i)(^HKLM)", - replacement=r"HKEY_LOCAL_MACHINE"), - field_name_conditions=[IncludeFieldCondition(['RegistryKey', 'PreviousRegistryKey'])] - ), - ProcessingItem( - identifier="microsoft_365_defender_registry_key_replace_hku", - transformation=ReplaceStringTransformation(regex=r"(?i)(^HKU)", - replacement=r"HKEY_USERS"), - field_name_conditions=[IncludeFieldCondition(['RegistryKey', 'PreviousRegistryKey'])] - ), - ProcessingItem( - identifier="microsoft_365_defender_registry_key_replace_hkcr", - transformation=ReplaceStringTransformation(regex=r"(?i)(^HKCR)", - replacement=r"HKEY_LOCAL_MACHINE\\CLASSES"), - field_name_conditions=[IncludeFieldCondition(['RegistryKey', 'PreviousRegistryKey'])] - ), - ProcessingItem( - identifier="microsoft_365_defender_registry_actiontype_value", - transformation=RegistryActionTypeValueTransformation(), - field_name_conditions=[IncludeFieldCondition(['ActionType'])] - ), - # Extract Domain from Username fields - ProcessingItem( - identifier="microsoft_365_defender_domain_username_extract", - transformation=SplitDomainUserTransformation(), - field_name_conditions=[IncludeFieldCondition(["AccountName", "InitiatingProcessAccountName"])] - ), - ProcessingItem( - identifier="microsoft_365_defender_hashes_field_values", - transformation=HashesValuesTransformation(), - field_name_conditions=[IncludeFieldCondition(['Hashes'])] - ), - # Processing item to essentially ignore initiated field - ProcessingItem( - identifier="microsoft_365_defender_network_initiated_field", - transformation=DropDetectionItemTransformation(), - field_name_conditions=[IncludeFieldCondition(['Initiated'])], - rule_conditions=[LogsourceCondition(category='network_connection')], - ) -] - -# ParentImage -> InitiatingProcessParentFileName -parent_image_proc_items = [ - # First apply fieldmapping from ParentImage to InitiatingProcessParentFileName for non process-creation rules - ProcessingItem( - identifier="microsoft_365_defender_parent_image_fieldmapping", - transformation=FieldMappingTransformation(parent_image_field_mapping), - rule_conditions=[ - # Exclude process_creation events, there's direct field mapping in this schema table - LogsourceCondition(category='process_creation') - ], - rule_condition_negation=True - ), - # Second, extract the parent process name from the full path - ProcessingItem( - identifier="microsoft_365_defender_parent_image_name_value", - transformation=ParentImageValueTransformation(), - field_name_conditions=[ - IncludeFieldCondition(["InitiatingProcessParentFileName"]), - ], - rule_conditions=[ - # Exclude process_creation events, there's direct field mapping in this schema table - LogsourceCondition(category='process_creation') - ], - rule_condition_negation=True - ) - -] - -## Exceptions/Errors ProcessingItems -rule_error_proc_items = [ - # Category Not Supported - ProcessingItem( - identifier="microsoft_365_defender_unsupported_rule_category", - rule_condition_linking=any, - transformation=RuleFailureTransformation( - "Rule category not yet supported by the Microsoft 365 Defender Sigma backend." - ), - rule_condition_negation=True, - rule_conditions=[x for x in category_to_conditions_mappings.values()], - )] - -field_error_proc_items = [ - # Invalid fields per category - ProcessingItem( - identifier=f"microsoft_365_defender_unsupported_fields_{table_name}", - transformation=InvalidFieldTransformation( - f"Please use valid fields for the {table_name} table, or the following fields that have keymappings in this " - f"pipeline:\n" - # Combine field mappings for table and generic field mappings dicts, get the unique keys, add the Hashes field, sort it - f"{', '.join(sorted(set({**query_table_field_mappings[table_name], **generic_field_mappings}.keys()).union({'Hashes'})))}" - ), - field_name_conditions=[ - ExcludeFieldCondition(fields=table_fields + list(generic_field_mappings.keys()) + ['Hashes'])], - rule_conditions=[ - category_to_conditions_mappings[rule_category] - for rule_category in table_to_category_mappings[table_name] - ], - rule_condition_linking=any, - ) - for table_name, table_fields in valid_fields_per_table.items() -] - - -def microsoft_365_defender_pipeline(transform_parent_image: Optional[bool] = True, query_table: Optional[str] = None) -> ProcessingPipeline: - """Pipeline for transformations for SigmaRules to use in the Microsoft 365 Defender Backend - Field mappings based on documentation found here: - https://learn.microsoft.com/en-us/microsoft-365/security/defender/advanced-hunting-query-language?view=o365-worldwide - - :param query_table: If specified, the table name will be used in the finalizer, otherwise the table name will be selected based on the category of the rule. - :type query_table: Optional[str] - :param transform_parent_image: If True, the ParentImage field will be mapped to InitiatingProcessParentFileName, and - the parent process name in the ParentImage will be extracted and used. This is because the Microsoft 365 Defender - table schema does not contain a InitiatingProcessParentFolderPath field like it does for InitiatingProcessFolderPath. - i.e. ParentImage: C:\\Windows\\System32\\whoami.exe -> InitiatingProcessParentFileName: whoami.exe. - Defaults to True - :type transform_parent_image: Optional[bool] - - :return: ProcessingPipeline for Microsoft 365 Defender Backend - :rtype: ProcessingPipeline - """ - - pipeline_items = [ - *query_table_proc_items, - *fieldmappings_proc_items, - *generic_field_mappings_proc_item, - *replacement_proc_items, - *rule_error_proc_items, - *field_error_proc_items, - ] - - if transform_parent_image: - pipeline_items[4:4] = parent_image_proc_items - - return ProcessingPipeline( - name="Generic Log Sources to Windows 365 Defender Transformation", - priority=10, - items=pipeline_items, - allowed_backends=frozenset(["kusto"]), - finalizers=[Microsoft365DefenderTableFinalizer(table_names=query_table)] - ) diff --git a/sigma/pipelines/microsoft365defender/transformations.py b/sigma/pipelines/microsoft365defender/transformations.py deleted file mode 100644 index 9b8f779..0000000 --- a/sigma/pipelines/microsoft365defender/transformations.py +++ /dev/null @@ -1,14 +0,0 @@ -from sigma.processing.transformations import Transformation -from dataclasses import dataclass -from typing import Any - - -@dataclass -class SetQueryTableStateTransformation(Transformation): - """Appends rule query table to pipeline state query_table key""" - - val: Any = None - - def apply(self, pipeline: "sigma.processing.pipeline.Proces", rule: "sigma.rule.SigmaRule") -> None: - super().apply(pipeline, rule) - pipeline.state['query_table'] = pipeline.state.get('query_table', []) + [self.val] diff --git a/sigma/pipelines/sentinelasim/__init__.py b/sigma/pipelines/sentinelasim/__init__.py deleted file mode 100644 index 2791782..0000000 --- a/sigma/pipelines/sentinelasim/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .sentinelasim import sentinel_asim_pipeline -pipelines = { - "sentinel_asim": sentinel_asim_pipeline, # TODO: adapt identifier to something approproiate -} \ No newline at end of file diff --git a/sigma/pipelines/sentinelasim/sentinelasim.py b/sigma/pipelines/sentinelasim/sentinelasim.py deleted file mode 100644 index e2864ec..0000000 --- a/sigma/pipelines/sentinelasim/sentinelasim.py +++ /dev/null @@ -1,499 +0,0 @@ -from typing import Union, Optional, Iterable -from collections import defaultdict - -from sigma.exceptions import SigmaTransformationError -from sigma.pipelines.common import (logsource_windows_process_creation, logsource_windows_image_load, - logsource_windows_file_event, logsource_windows_file_delete, - logsource_windows_file_change, logsource_windows_file_access, - logsource_windows_file_rename, logsource_windows_registry_set, - logsource_windows_registry_add, logsource_windows_registry_delete, - logsource_windows_registry_event, logsource_windows_network_connection) -from sigma.processing.transformations import (FieldMappingTransformation, RuleFailureTransformation, - ReplaceStringTransformation, SetStateTransformation, - DetectionItemTransformation, ValueTransformation, - DetectionItemFailureTransformation, DropDetectionItemTransformation) -from sigma.processing.conditions import (IncludeFieldCondition, ExcludeFieldCondition, - DetectionItemProcessingItemAppliedCondition, LogsourceCondition) -from sigma.conditions import ConditionOR -from sigma.types import SigmaString, SigmaType -from sigma.processing.pipeline import ProcessingItem, ProcessingPipeline -from sigma.rule import SigmaDetectionItem, SigmaDetection - -from .microsoft365defender import ( - SplitDomainUserTransformation, - HashesValuesTransformation, - RegistryActionTypeValueTransformation, - ParentImageValueTransformation, - InvalidFieldTransformation, -) - -process_events_table = 'imProcessCreate' -registry_events_table = 'imRegistry' -file_events_table = 'imFileEvent' - -# FIELD MAPPINGS -## Field mappings from Sysmon (where applicable) fields to Advanced Hunting Query fields based on schema in tables -## See: https://learn.microsoft.com/en-us/microsoft-365/security/defender/advanced-hunting-schema-tables?view=o365-worldwide#learn-the-schema-tables -query_table_field_mappings = { - process_events_table: { # process_creation, Sysmon EventID 1 -> DeviceProcessEvents table - 'ProcessGuid': 'TargetProcessGuid', - 'ProcessId': 'TargetProcessId', - 'Image': 'TargetProcessName', - 'FileVersion': 'TargetProcessFileVersion', - 'Description': 'TargetProcessFileDescription', - 'Product': 'TargetProcessFileProduct', - 'Company': 'TargetProcessFileCompany', - 'OriginalFileName': 'TargetProcessFilename', - 'CommandLine': 'TargetProcessCommandLine', - 'CurrentDirectory': 'TargetProcessCurrentDirectory', - 'User': 'TargetUsername', - 'LogonGuid': 'TargetUserSessionGuid', - 'LogonId': 'TargetUsername', - 'TerminalSessionId': 'TargetUserSessionId', - 'IntegrityLevel': 'TargetProcessIntegrityLevel', - 'sha1': 'TargetProcessSHA1', - 'sha256': 'TargetProcessSHA256', - 'md5': 'TargetProcessMD5', - 'ParentProcessGuid': 'ActingProcessGuid', - 'ParentProcessId': 'ActingProcessId', - 'ParentImage': 'ActingProcessName', - 'ParentCommandLine': 'ActingProcessCommandLine', - 'ParentUser': 'ActorUsername', - 'ProcessVersionInfoOriginalFileName':'TargetProcessFileVersion', - 'ProcessVersionInfoFileDescription':'TargetProcessFileDescription', - 'ProcessIntegrityLevel': 'TargetProcessIntegrityLevel', - 'InitiatingProcessFolderPath': 'ActingProcessName', - 'InitiatingProcessCommandLine': 'ActingProcessCommandLine', - }, - 'DeviceImageLoadEvents': { - # 'ProcessGuid': ?, - 'ProcessId': 'InitiatingProcessId', - 'Image': 'InitiatingProcessFolderPath', # File path of the process that loaded the image - 'ImageLoaded': 'FolderPath', - 'FileVersion': 'InitiatingProcessVersionInfoProductVersion', - 'Description': 'InitiatingProcessVersionInfoFileDescription', - 'Product': 'InitiatingProcessVersionInfoProductName', - 'Company': 'InitiatingProcessVersionInfoCompanyName', - 'OriginalFileName': 'InitiatingProcessVersionInfoOriginalFileName', - # 'Hashes': ?, - 'sha1': 'SHA1', - 'sha256': 'SHA256', - 'md5': 'MD5', - # 'Signed': ? - # 'Signature': ? - # 'SignatureStatus': ? - 'User': 'InitiatingProcessAccountName' - }, - file_events_table: { # file_*, Sysmon EventID 11 (create), 23 (delete) -> DeviceFileEvents table - 'ProcessGuid': 'ActingProcessGuid', - 'ProcessId': 'ActingProcessId', - 'Image': 'ActingProcessName', - 'TargetFilename': 'TargetFileName', - 'CreationUtcTime': 'TargetFileCreationTime', - 'User': 'ActorUsername', - # 'Hashes': ?, - # 'sha1': 'SHA1', - # 'sha256': 'SHA256', - # 'md5': 'MD5', - }, - 'DeviceNetworkEvents': { # network_connection, Sysmon EventID 3 -> DeviceNetworkEvents table - # 'ProcessGuid': ?, - 'ProcessId': 'InitiatingProcessId', - 'Image': 'InitiatingProcessFolderPath', - 'User': 'InitiatingProcessAccountName', - 'Protocol': 'Protocol', - # 'Initiated': ?, - # 'SourceIsIpv6': ?, - 'SourceIp': 'LocalIP', - 'SourceHostname': 'DeviceName', - 'SourcePort': 'LocalPort', - # 'SourcePortName': ?, - # 'DestinationIsIpv6': ?, - 'DestinationIp': 'RemoteIP', - 'DestinationHostname': 'RemoteUrl', - 'DestinationPort': 'RemotePort', - # 'DestinationPortName': ?, - }, - registry_events_table: { - # registry_*, Sysmon EventID 12 (create/delete), 13 (value set), 14 (key/value rename) -> DeviceRegistryEvents table, - 'EventType': 'EventType', - 'ProcessGuid': 'ActingProcessGuid', - 'ProcessId': 'ActingProcessId', - 'Image': 'ActingProcessName', - 'TargetObject': 'RegistryKey', - # 'NewName': ? - 'Details': 'RegistryValueData', - 'User': 'ActorUsername' - } -} - -## Generic catch-all field mappings for sysmon -> microsoft 365 defender fields that appear in most tables and -## haven't been mapped already -generic_field_mappings = { - 'EventType': 'EventType', - 'User': 'TargetUsername', - 'CommandLine': 'TargetProcessCommandLine', - 'Image': 'TargetProcessName', - 'SourceImage': 'TargetProcessName', - 'ProcessId': 'TargetProcessId', - 'md5': 'TargetProcessMD5', - #'sha1': 'InitiatingProcessSHA1', - 'sha256': 'TargetProcessSHA256', - 'ParentProcessId': 'ActingProcessId', - 'ParentCommandLine': 'ActingProcessCommandLine', - 'Company': 'TargetProcessFileCompany', - 'Description': 'TargetProcessFileDescription', - 'OriginalFileName': 'TargetProcessName', - 'Product': 'TargetProcessFileProduct', - 'Timestamp': 'TimeGenerated', - 'FolderPath': 'TargetProcessName', - 'ProcessCommandLine': 'TargetProcessCommandLine', -} - -# VALID FIELDS PER QUERY TABLE -## Will Implement field checking later once issue with removing fields is figured out, for now it fails the pipeline -## dict of {'table_name': [list, of, valid_fields]} for each table -valid_fields_per_table = { - process_events_table: [ - "TimeGenerated", - "TargetProcessGuid", - "TargetProcessId", - "TargetProcessName", - "TargetProcessFileVersion", - "TargetProcessFileDescription", - "TargetProcessFileProduct", - "CommandLine", - "User", - "TargetUserSessionGuid", - "TargetProcessIntegrityLevel", - "ActingProcessGuid", - "ActingProcessId", - "ActingProcessName", - "ActingProcessCommandLine", - "ActorUsername", - "TargetProcessSHA256", - "TargetProcessIMPHASH", - "TargetProcessMD5", - "EventType", - "EventStartTime", - "EventEndTime", - "EventCount", - "EventVendor", - "EventSchemaVersion", - "EventSchema", - "EventProduct", - "EventResult", - "DvcOs", - "TargetUserSessionId", - "TargetUsernameType", - "TargetUsername", - "TargetProcessCommandLine", - "TargetProcessCurrentDirectory", - "ActorUsernameType", - "EventOriginalType", - "Process", - "Dvc", - "Hash", - "DvcHostname", - "EventSourceName", - "TargetProcessFileCompany", - "TargetProcessFilename", - "HashType", - "Channel", - "Task", - "SourceComputerId", - "EventOriginId", - "TimeCollected" - ], - 'DeviceImageLoadEvents': ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'FileName', 'FolderPath', 'SHA1', - 'SHA256', 'MD5', 'FileSize', 'InitiatingProcessAccountDomain', - 'InitiatingProcessAccountName', 'InitiatingProcessAccountSid', - 'InitiatingProcessAccountUpn', 'InitiatingProcessAccountObjectId', - 'InitiatingProcessIntegrityLevel', 'InitiatingProcessTokenElevation', - 'InitiatingProcessSHA1', 'InitiatingProcessSHA256', 'InitiatingProcessMD5', - 'InitiatingProcessFileName', 'InitiatingProcessFileSize', - 'InitiatingProcessVersionInfoCompanyName', 'InitiatingProcessVersionInfoProductName', - 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', - 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessFolderPath', 'InitiatingProcessParentId', - 'InitiatingProcessParentFileName', 'InitiatingProcessParentCreationTime', 'ReportId', - 'AppGuardContainerId'], - file_events_table: ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'FileName', 'FolderPath', 'SHA1', - 'SHA256', 'MD5', 'FileOriginUrl', 'FileOriginReferrerUrl', 'FileOriginIP', - 'PreviousFolderPath', 'PreviousFileName', 'FileSize', 'InitiatingProcessAccountDomain', - 'InitiatingProcessAccountName', 'InitiatingProcessAccountSid', 'InitiatingProcessAccountUpn', - 'InitiatingProcessAccountObjectId', 'InitiatingProcessMD5', 'InitiatingProcessSHA1', - 'InitiatingProcessSHA256', 'InitiatingProcessFolderPath', 'InitiatingProcessFileName', - 'InitiatingProcessFileSize', 'InitiatingProcessVersionInfoCompanyName', - 'InitiatingProcessVersionInfoProductName', 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessIntegrityLevel', 'InitiatingProcessTokenElevation', - 'InitiatingProcessParentId', 'InitiatingProcessParentFileName', - 'InitiatingProcessParentCreationTime', 'RequestProtocol', 'RequestSourceIP', - 'RequestSourcePort', 'RequestAccountName', 'RequestAccountDomain', 'RequestAccountSid', - 'ShareName', 'InitiatingProcessFileSize', 'SensitivityLabel', 'SensitivitySubLabel', - 'IsAzureInfoProtectionApplied', 'ReportId', 'AppGuardContainerId', 'AdditionalFields'], - registry_events_table: ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'RegistryKey', 'RegistryValueType', - 'RegistryValueName', 'RegistryValueData', 'PreviousRegistryKey', - 'PreviousRegistryValueName', 'PreviousRegistryValueData', 'InitiatingProcessAccountDomain', - 'InitiatingProcessAccountName', 'InitiatingProcessAccountSid', - 'InitiatingProcessAccountUpn', 'InitiatingProcessAccountObjectId', 'InitiatingProcessSHA1', - 'InitiatingProcessSHA256', 'InitiatingProcessMD5', 'InitiatingProcessFileName', - 'InitiatingProcessFileSize', 'InitiatingProcessVersionInfoCompanyName', - 'InitiatingProcessVersionInfoProductName', 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', - 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessFolderPath', 'InitiatingProcessParentId', - 'InitiatingProcessParentFileName', 'InitiatingProcessParentCreationTime', - 'InitiatingProcessIntegrityLevel', 'InitiatingProcessTokenElevation', 'ReportId', - 'AppGuardContainerId'], - 'DeviceNetworkEvents': ['Timestamp', 'DeviceId', 'DeviceName', 'ActionType', 'RemoteIP', 'RemotePort', 'RemoteUrl', - 'LocalIP', 'LocalPort', 'Protocol', 'LocalIPType', 'RemoteIPType', 'InitiatingProcessSHA1', - 'InitiatingProcessSHA256', 'InitiatingProcessMD5', 'InitiatingProcessFileName', - 'InitiatingProcessFileSize', 'InitiatingProcessVersionInfoCompanyName', - 'InitiatingProcessVersionInfoProductName', 'InitiatingProcessVersionInfoProductVersion', - 'InitiatingProcessVersionInfoInternalFileName', - 'InitiatingProcessVersionInfoOriginalFileName', - 'InitiatingProcessVersionInfoFileDescription', 'InitiatingProcessId', - 'InitiatingProcessCommandLine', 'InitiatingProcessCreationTime', - 'InitiatingProcessFolderPath', 'InitiatingProcessParentFileName', - 'InitiatingProcessParentId', 'InitiatingProcessParentCreationTime', - 'InitiatingProcessAccountDomain', 'InitiatingProcessAccountName', - 'InitiatingProcessAccountSid', 'InitiatingProcessAccountUpn', - 'InitiatingProcessAccountObjectId', 'InitiatingProcessIntegrityLevel', - 'InitiatingProcessTokenElevation', 'ReportId', 'AppGuardContainerId', 'AdditionalFields']} - -# Mapping from ParentImage to InitiatingProcessParentFileName. Must be used alongside of ParentImageValueTransformation -parent_image_field_mapping = {'ParentImage': 'InitiatingProcessParentFileName'} - -# OTHER MAPPINGS -## useful for creating ProcessingItems() with list comprehension - -## Query Table names -> rule categories -table_to_category_mappings = { - process_events_table: ['process_creation'], - 'DeviceImageLoadEvents': ['image_load'], - file_events_table: ['file_access', 'file_change', 'file_delete', 'file_event', 'file_rename'], - registry_events_table: ['registry_add', 'registry_delete', 'registry_event', 'registry_set'], - 'DeviceNetworkEvents': ['network_connection'] -} - -## rule categories -> RuleConditions -category_to_conditions_mappings = { - 'process_creation': logsource_windows_process_creation(), - 'image_load': logsource_windows_image_load(), - 'file_access': logsource_windows_file_access(), - 'file_change': logsource_windows_file_change(), - 'file_delete': logsource_windows_file_delete(), - 'file_event': logsource_windows_file_event(), - 'file_rename': logsource_windows_file_rename(), - 'registry_add': logsource_windows_registry_add(), - 'registry_delete': logsource_windows_registry_delete(), - 'registry_event': logsource_windows_registry_event(), - 'registry_set': logsource_windows_registry_set(), - 'network_connection': logsource_windows_network_connection() -} - -# PROCESSING_ITEMS() -## ProcessingItems to set state key 'query_table' to use in backend -## i.e. $QueryTable$ | $rest_of_query$ -query_table_proc_items = [ - ProcessingItem( - identifier=f"microsoft_365_defender_set_query_table_{table_name}", - transformation=SetStateTransformation("query_table", table_name), - rule_conditions=[ - category_to_conditions_mappings[rule_category] for rule_category in rule_categories - ], - rule_condition_linking=any, - ) - for table_name, rule_categories in table_to_category_mappings.items() -] - -## Fieldmappings -fieldmappings_proc_items = [ - ProcessingItem( - identifier=f"microsoft_365_defender_fieldmappings_{table_name}", - transformation=FieldMappingTransformation(query_table_field_mappings[table_name]), - rule_conditions=[ - category_to_conditions_mappings[rule_category] for rule_category in rule_categories - ], - rule_condition_linking=any, - ) - for table_name, rule_categories in table_to_category_mappings.items() -] - -## Generic Fielp Mappings, keep this last -## Exclude any fields already mapped. For example, if process_creation events ProcessId has already -## been mapped to the same field name (ProcessId), we don't to remap it to InitiatingProcessId -generic_field_mappings_proc_item = [ProcessingItem( - identifier="microsoft_365_defender_fieldmappings_generic", - transformation=FieldMappingTransformation( - generic_field_mappings - ), - detection_item_conditions=[ - DetectionItemProcessingItemAppliedCondition(f"microsoft_365_defender_fieldmappings_{table_name}") - for table_name in table_to_category_mappings.keys() - ], - detection_item_condition_linking=any, - detection_item_condition_negation=True, -) -] - -## Field Value Replacements ProcessingItems -replacement_proc_items = [ - # Sysmon uses abbreviations in RegistryKey values, replace with full key names as the DeviceRegistryEvents schema - # expects them to be - # Note: Ensure this comes AFTER field mapping renames, as we're specifying DeviceRegistryEvent fields - # - # Do this one first, or else the HKLM only one will replace HKLM and mess up the regex - ProcessingItem( - identifier="microsoft_365_defender_registry_key_replace_currentcontrolset", - transformation=ReplaceStringTransformation(regex=r"(?i)(^HKLM\\SYSTEM\\CurrentControlSet)", - replacement=r"HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet001"), - field_name_conditions=[IncludeFieldCondition(['RegistryKey', 'PreviousRegistryKey'])] - ), - ProcessingItem( - identifier="microsoft_365_defender_registry_key_replace_hklm", - transformation=ReplaceStringTransformation(regex=r"(?i)(^HKLM)", - replacement=r"HKEY_LOCAL_MACHINE"), - field_name_conditions=[IncludeFieldCondition(['RegistryKey', 'PreviousRegistryKey'])] - ), - ProcessingItem( - identifier="microsoft_365_defender_registry_key_replace_hku", - transformation=ReplaceStringTransformation(regex=r"(?i)(^HKU)", - replacement=r"HKEY_USERS"), - field_name_conditions=[IncludeFieldCondition(['RegistryKey', 'PreviousRegistryKey'])] - ), - ProcessingItem( - identifier="microsoft_365_defender_registry_key_replace_hkcr", - transformation=ReplaceStringTransformation(regex=r"(?i)(^HKCR)", - replacement=r"HKEY_LOCAL_MACHINE\\CLASSES"), - field_name_conditions=[IncludeFieldCondition(['RegistryKey', 'PreviousRegistryKey'])] - ), - ProcessingItem( - identifier="microsoft_365_defender_registry_actiontype_value", - transformation=RegistryActionTypeValueTransformation(), - field_name_conditions=[IncludeFieldCondition(['ActionType'])] - ), - # Extract Domain from Username fields - ProcessingItem( - identifier="microsoft_365_defender_domain_username_extract", - transformation=SplitDomainUserTransformation(), - field_name_conditions=[IncludeFieldCondition(["AccountName", "InitiatingProcessAccountName"])] - ), - ProcessingItem( - identifier="microsoft_365_defender_hashes_field_values", - transformation=HashesValuesTransformation(), - field_name_conditions=[IncludeFieldCondition(['Hashes'])] - ), - # Processing item to essentially ignore initiated field - ProcessingItem( - identifier="microsoft_365_defender_network_initiated_field", - transformation=DropDetectionItemTransformation(), - field_name_conditions=[IncludeFieldCondition(['Initiated'])], - rule_conditions=[LogsourceCondition(category='network_connection')], - ) -] - -# ParentImage -> InitiatingProcessParentFileName -parent_image_proc_items = [ - # First apply fieldmapping from ParentImage to InitiatingProcessParentFileName for non process-creation rules - ProcessingItem( - identifier="microsoft_365_defender_parent_image_fieldmapping", - transformation=FieldMappingTransformation(parent_image_field_mapping), - rule_conditions=[ - # Exclude process_creation events, there's direct field mapping in this schema table - LogsourceCondition(category='process_creation') - ], - rule_condition_negation=True - ), - # Second, extract the parent process name from the full path - ProcessingItem( - identifier="microsoft_365_defender_parent_image_name_value", - transformation=ParentImageValueTransformation(), - field_name_conditions=[ - IncludeFieldCondition(["InitiatingProcessParentFileName"]), - ], - rule_conditions=[ - # Exclude process_creation events, there's direct field mapping in this schema table - LogsourceCondition(category='process_creation') - ], - rule_condition_negation=True - ) - -] - -## Exceptions/Errors ProcessingItems -rule_error_proc_items = [ - # Category Not Supported - ProcessingItem( - identifier="microsoft_365_defender_unsupported_rule_category", - rule_condition_linking=any, - transformation=RuleFailureTransformation( - "Rule category not yet supported by the Microsoft 365 Defender Sigma backend." - ), - rule_condition_negation=True, - rule_conditions=[x for x in category_to_conditions_mappings.values()], - )] - -field_error_proc_items = [ - # Invalid fields per category - ProcessingItem( - identifier=f"microsoft_365_defender_unsupported_fields_{table_name}", - transformation=InvalidFieldTransformation( - f"Please use valid fields for the {table_name} table, or the following fields that have keymappings in this " - f"pipeline:\n" - # Combine field mappings for table and generic field mappings dicts, get the unique keys, add the Hashes field, sort it - f"{', '.join(sorted(set({**query_table_field_mappings[table_name], **generic_field_mappings}.keys()).union({'Hashes'})))}" - ), - field_name_conditions=[ - ExcludeFieldCondition(fields=table_fields + list(generic_field_mappings.keys()) + ['Hashes'])], - rule_conditions=[ - category_to_conditions_mappings[rule_category] - for rule_category in table_to_category_mappings[table_name] - ], - rule_condition_linking=any, - ) - for table_name, table_fields in valid_fields_per_table.items() -] - - -def sentinel_asim_pipeline(transform_parent_image: Optional[bool] = True) -> ProcessingPipeline: - """Pipeline for transformations for SigmaRules to use with the Sentinel ASIM Functions - - :param transform_parent_image: If True, the ParentImage field will be mapped to InitiatingProcessParentFileName, and - the parent process name in the ParentImage will be extracted and used. This is because the Microsoft 365 Defender - table schema does not contain a InitiatingProcessParentFolderPath field like it does for InitiatingProcessFolderPath. - i.e. ParentImage: C:\\Windows\\System32\\whoami.exe -> InitiatingProcessParentFileName: whoami.exe. - Defaults to True - :type transform_parent_image: Optional[bool] - - :return: ProcessingPipeline for Microsoft 365 Defender Backend - :rtype: ProcessingPipeline - """ - - pipeline_items = [ - *query_table_proc_items, - *fieldmappings_proc_items, - *generic_field_mappings_proc_item, - *replacement_proc_items, - *rule_error_proc_items, - *field_error_proc_items, - ] - - if transform_parent_image: - pipeline_items[4:4] = parent_image_proc_items - - return ProcessingPipeline( - name="Generic Log Sources to Windows 365 ASIM Transformation", - priority=10, - items=pipeline_items, - allowed_backends=frozenset(["kusto"]), - ) diff --git a/tests/test_backend_kusto.py b/tests/test_backend_kusto.py deleted file mode 100644 index 0811a0f..0000000 --- a/tests/test_backend_kusto.py +++ /dev/null @@ -1,297 +0,0 @@ -import pytest -from sigma.collection import SigmaCollection -from sigma.backends.kusto import KustoBackend -from sigma.pipelines.microsoft365defender import microsoft_365_defender_pipeline - - - -@pytest.fixture -def microsoft365defender_backend(): - return KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()) - - -def test_microsoft365defender_and_expression(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine: valueA - User: valueB - condition: sel - """) - ) == ['DeviceProcessEvents\n| where ProcessCommandLine =~ "valueA" and AccountName =~ "valueB"'] - - -def test_microsoft365defender_or_expression(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel1: - CommandLine: valueA - sel2: - User: valueB - condition: 1 of sel* - """) - ) == ['DeviceProcessEvents\n| where ProcessCommandLine =~ "valueA" or AccountName =~ "valueB"'] - - -def test_microsoft365defender_and_or_expression(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine: - - valueA1 - - valueA2 - ProcessId: - - valueB1 - - valueB2 - condition: sel - """) - ) == ['DeviceProcessEvents\n| where (ProcessCommandLine in~ ("valueA1", "valueA2")) and ' - '(ProcessId in~ ("valueB1", "valueB2"))'] - - -def test_microsoft365defender_or_and_expression(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel1: - CommandLine: valueA1 - ProcessId: valueB1 - sel2: - CommandLine: valueA2 - ProcessId: valueB2 - condition: 1 of sel* - """) - ) == ['DeviceProcessEvents\n| where (ProcessCommandLine =~ "valueA1" and ProcessId =~ "valueB1") or ' - '(ProcessCommandLine =~ "valueA2" and ProcessId =~ "valueB2")'] - - -def test_microsoft365defender_in_expression(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine: - - valueA - - valueB - - valueC* - condition: sel - """) - ) == ['DeviceProcessEvents\n| where ProcessCommandLine in~ ("valueA", "valueB") or ' - 'ProcessCommandLine startswith "valueC"'] - - -def test_microsoft365defender_regex_query(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine|re: foo.*bar - ProcessId: foo - condition: sel - """) - ) == ['DeviceProcessEvents\n| where ProcessCommandLine matches regex "foo.*bar" and ProcessId =~ "foo"'] - - -def test_microsoft365defender_cidr_query(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - SourceIp|cidr: 192.168.0.0/16 - condition: sel - """) - ) == ['DeviceNetworkEvents\n| where ipv4_is_in_range(LocalIP, "192.168.0.0/16")'] - - -def test_microsoft365defender_negation_basic(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(r""" - title: Test - status: test - logsource: - product: windows - category: process_creation - detection: - selection: - Image: - - '*\process.exe' - CommandLine: - - 'this' - filter: - CommandLine: - - 'notthis' - condition: selection and not filter - """) - ) == ['DeviceProcessEvents\n| where (FolderPath endswith "\\\\process.exe" and ' - 'ProcessCommandLine =~ "this") and ' - '(not(ProcessCommandLine =~ "notthis"))'] - - -def test_microsoft365defender_negation_contains(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(r""" - title: Test - status: test - logsource: - product: windows - category: process_creation - detection: - selection: - Image: - - '*\process.exe' - CommandLine: - - '*this*' - filter: - CommandLine: - - '*notthis*' - condition: selection and not filter - """) - ) == ['DeviceProcessEvents\n| where (FolderPath endswith "\\\\process.exe" and ' - 'ProcessCommandLine contains "this") and ' - '(not(ProcessCommandLine contains "notthis"))'] - - -def test_microsoft365defender_grouping(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(r""" - title: Net connection logic test - status: test - logsource: - category: network_connection - product: windows - detection: - selection: - Image: - - '*\powershell.exe' - - '*\pwsh.exe' - DestinationHostname: - - '*pastebin.com*' - - '*anothersite.com*' - condition: selection - """) - ) == ['DeviceNetworkEvents\n| where (InitiatingProcessFolderPath endswith "\\\\powershell.exe" or ' - 'InitiatingProcessFolderPath endswith "\\\\pwsh.exe") and (RemoteUrl contains ' - '"pastebin.com" or RemoteUrl contains "anothersite.com")'] - - -def test_microsoft365defender_escape_cmdline_slash(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml(r""" - title: Delete All Scheduled Tasks - id: 220457c1-1c9f-4c2e-afe6-9598926222c1 - status: test - description: Detects the usage of schtasks with the delete flag and the asterisk symbol to delete all tasks from the schedule of the local computer, including tasks scheduled by other users. - references: - - https://docs.microsoft.com/en-us/windows-server/administration/windows-commands/schtasks-delete - author: Nasreddine Bencherchali (Nextron Systems) - date: 2022-09-09 - tags: - - attack.impact - - attack.t1489 - logsource: - category: process_creation - product: windows - detection: - selection: - Image|endswith: '\schtasks.exe' - CommandLine|contains|all: - - ' /delete ' - - '/tn \*' - - ' /f' - condition: selection - falsepositives: - - Unlikely - level: high - """) - ) == ['DeviceProcessEvents\n| where FolderPath endswith "\\\\schtasks.exe" and ' - '(ProcessCommandLine contains " /delete " and ' - 'ProcessCommandLine contains "/tn *" and ' - 'ProcessCommandLine contains " /f")'] - - -def test_microsoft365defender_cmdline_filters(microsoft365defender_backend: KustoBackend): - assert microsoft365defender_backend.convert( - SigmaCollection.from_yaml( - r""" - title: New Firewall Rule Added Via Netsh.EXE - id: cd5cfd80-aa5f-44c0-9c20-108c4ae12e3c - status: test - description: Detects the addition of a new rule to the Windows firewall via netsh - references: - - https://www.operationblockbuster.com/wp-content/uploads/2016/02/Operation-Blockbuster-RAT-and-Staging-Report.pdf - author: Markus Neis, Sander Wiebing - date: 2019-01-29 - modified: 2023-02-10 - tags: - - attack.defense_evasion - - attack.t1562.004 - - attack.s0246 - logsource: - category: process_creation - product: windows - detection: - selection_img: - - Image|endswith: '\netsh.exe' - - OriginalFileName: 'netsh.exe' - selection_cli: - CommandLine|contains|all: - - ' firewall ' - - ' add ' - filter_optional_dropbox: - CommandLine|contains: - - 'advfirewall firewall add rule name=Dropbox dir=in action=allow "program=?:\Program Files (x86)\Dropbox\Client\Dropbox.exe" enable=yes profile=Any' - - 'advfirewall firewall add rule name=Dropbox dir=in action=allow "program=?:\Program Files\Dropbox\Client\Dropbox.exe" enable=yes profile=Any' - condition: all of selection_* and not 1 of filter_optional_* - falsepositives: - - Legitimate administration activity - - Software installations - level: medium - """ - ) - ) == ['DeviceProcessEvents\n| where ((FolderPath endswith "\\\\netsh.exe" or ' - 'ProcessVersionInfoOriginalFileName =~ "netsh.exe") and ' - '(ProcessCommandLine contains " firewall " and ProcessCommandLine contains " add ")) and ' - '(not(((ProcessCommandLine contains "advfirewall firewall add rule name=Dropbox dir=in action=allow ' - '\\"program=" and ProcessCommandLine contains ":\\\\Program Files (x86)\\\\Dropbox\\\\Client\\\\Dropbox.exe\\" ' - 'enable=yes profile=Any") or (ProcessCommandLine contains "advfirewall firewall add rule name=Dropbox dir=in ' - 'action=allow \\"program=" and ProcessCommandLine contains ":\\\\Program Files\\\\Dropbox\\\\Client\\\\Dropbox.exe\\" ' - 'enable=yes profile=Any"))))' - ] diff --git a/tests/test_pipelines_microsoft365defender.py b/tests/test_pipelines_microsoft365defender.py deleted file mode 100644 index 5a79b04..0000000 --- a/tests/test_pipelines_microsoft365defender.py +++ /dev/null @@ -1,716 +0,0 @@ -import pytest -from sigma.exceptions import SigmaTransformationError -from sigma.pipelines.microsoft365defender.microsoft365defender import InvalidHashAlgorithmError - -from sigma.backends.kusto import KustoBackend -from sigma.collection import SigmaCollection -from sigma.pipelines.microsoft365defender import microsoft_365_defender_pipeline - - -def test_microsoft_365_defender_username_transformation(): - """Tests splitting username up into different fields if it includes a domain""" - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel1: - CommandLine: command1 - AccountName: username1 - sel2: - CommandLine: command2 - AccountName: domain2\\username2 - sel3: - CommandLine: command3 - InitiatingProcessAccountName: - - username3 - - domain4\\username4 - sel4: - AccountName: username5 - condition: any of sel* - """) - ) == ['DeviceProcessEvents\n| ' - 'where (ProcessCommandLine =~ "command1" and AccountName =~ "username1") or ' - '(ProcessCommandLine =~ "command2" and (AccountName =~ "username2" and AccountDomain =~ "domain2")) or ' - '(ProcessCommandLine =~ "command3" and (InitiatingProcessAccountName =~ "username3" or ' - '(InitiatingProcessAccountName =~ "username4" and InitiatingProcessAccountDomain =~ "domain4"))) or ' - 'AccountName =~ "username5"'] - - -def test_microsoft_365_defender_hashes_values_transformation(): - """Test for getting hash algo/value from Hashes field and creating new detection items from them""" - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel1: - Hashes: - - md5|e708864855f3bb69c4d9a213b9108b9f - - sha1|00ea1da4192a2030f9ae023de3b3143ed647bbab - - sha256|6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf - sel2: - Hashes: - - 0b49939d6415354c950b142a0b1e696a - - 4b2b79b6f371ca18f1216461cffeaddf6848a50e - - 8f16f88cfa1cf0d17c75403aa9614d806ebc00419763e0ecac3860decbcd9988 - - invalidhashvalue - condition: any of sel* - """) - ) == ['DeviceProcessEvents\n' - '| where (MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" or SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" ' - 'or SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf") or ' - '(MD5 =~ "0b49939d6415354c950b142a0b1e696a" or SHA1 =~ "4b2b79b6f371ca18f1216461cffeaddf6848a50e" or ' - 'SHA256 =~ "8f16f88cfa1cf0d17c75403aa9614d806ebc00419763e0ecac3860decbcd9988")'] - - -def test_microsoft_365_defender_process_creation_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine: val1 - Image: val2 - condition: sel - """) - ) == ['DeviceProcessEvents\n| where ProcessCommandLine =~ "val1" and FolderPath =~ "val2"'] - - -def test_microsoft_365_defender_image_load_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: image_load - product: windows - detection: - sel: - ImageLoaded: val1 - sha1: val2 - condition: sel - """) - ) == ['DeviceImageLoadEvents\n| where FolderPath =~ "val1" and SHA1 =~ "val2"'] - - -def test_microsoft_365_defender_file_access_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_access - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """) - ) == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - - -def test_microsoft_365_defender_file_change_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_change - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """) - ) == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - - -def test_microsoft_365_defender_file_delete_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_delete - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """) - ) == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - - -def test_microsoft_365_defender_file_event_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_change - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """) - ) == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - - -def test_microsoft_365_defender_file_rename_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_rename - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """) - ) == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - - -def test_microsoft_365_defender_registry_add_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: registry_add - product: windows - detection: - sel: - Image: val1 - TargetObject: val2 - condition: sel - """) - ) == ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] - - -def test_microsoft_365_defender_registry_delete_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: registry_delete - product: windows - detection: - sel: - Image: val1 - TargetObject: val2 - condition: sel - """) - ) == ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] - - -def test_microsoft_365_defender_registry_event_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: registry_event - product: windows - detection: - sel: - Image: val1 - TargetObject: val2 - condition: sel - """) - ) == ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] - - -def test_microsoft_365_defender_registry_set_simple(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: registry_set - product: windows - detection: - sel: - Image: val1 - TargetObject: val2 - condition: sel - """) - ) == ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] - - -def test_microsoft_365_defender_process_creation_field_mapping(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Image: C:\\Path\\to\\notmalware.exe - FileVersion: 1 - Description: A Description - Product: pySigma - Company: AttackIQ - OriginalFileName: malware.exe - ProcessId: 2 - CommandLine: definitely not malware - User: heyitsmeyourbrother - IntegrityLevel: 1 - sha1: a123123123 - sha256: a123123123 - md5: a123123123 - ParentProcessId: 1 - ParentImage: C:\\Windows\\Temp\\freemoney.pdf - ParentCommandLine: freemoney.pdf test exe please ignore - ParentUser: heyitsmeyourparent - - condition: sel - """) - ) == ['DeviceProcessEvents\n| ' - 'where FolderPath =~ "C:\\\\Path\\\\to\\\\notmalware.exe" and ' - 'ProcessVersionInfoProductVersion == 1 and ' - 'ProcessVersionInfoFileDescription =~ "A Description" and ' - 'ProcessVersionInfoProductName =~ "pySigma" and ' - 'ProcessVersionInfoCompanyName =~ "AttackIQ" and ' - 'ProcessVersionInfoOriginalFileName =~ "malware.exe" and ' - 'ProcessId == 2 and ' - 'ProcessCommandLine =~ "definitely not malware" and ' - 'AccountName =~ "heyitsmeyourbrother" and ' - 'ProcessIntegrityLevel == 1 and ' - 'SHA1 =~ "a123123123" and ' - 'SHA256 =~ "a123123123" and ' - 'MD5 =~ "a123123123" and ' - 'InitiatingProcessId == 1 and ' - 'InitiatingProcessFolderPath =~ "C:\\\\Windows\\\\Temp\\\\freemoney.pdf" and ' - 'InitiatingProcessCommandLine =~ "freemoney.pdf test exe please ignore" and ' - 'InitiatingProcessAccountName =~ "heyitsmeyourparent"'] - - -def test_microsoft_365_defender_image_load_field_mapping(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: image_load - product: windows - detection: - sel: - ProcessId: 1 - Image: C:\\Temp\\notmalware.exe - ImageLoaded: C:\\Temp\\definitelynotmalware.exe - FileVersion: 1 - Description: A Description - Product: A Product - Company: AttackIQ - OriginalFileName: freemoney.pdf.exe - md5: e708864855f3bb69c4d9a213b9108b9f - sha1: 00ea1da4192a2030f9ae023de3b3143ed647bbab - sha256: 6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf - User: username - - condition: sel - """) - ) == ['DeviceImageLoadEvents\n| ' - 'where InitiatingProcessId == 1 and InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\notmalware.exe" and ' - 'FolderPath =~ "C:\\\\Temp\\\\definitelynotmalware.exe" and InitiatingProcessVersionInfoProductVersion == 1 ' - 'and InitiatingProcessVersionInfoFileDescription =~ "A Description" and ' - 'InitiatingProcessVersionInfoProductName =~ "A Product" and ' - 'InitiatingProcessVersionInfoCompanyName =~ "AttackIQ" and ' - 'InitiatingProcessVersionInfoOriginalFileName =~ "freemoney.pdf.exe" and ' - 'MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" and SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" and ' - 'SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf" and ' - 'InitiatingProcessAccountName =~ "username"'] - - -def test_microsoft_365_defender_file_event_field_mapping(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_event - product: windows - detection: - sel: - ProcessId: 1 - Image: C:\\Path\\To\\process.exe - TargetFilename: C:\\Temp\\passwords.txt - User: username - md5: e708864855f3bb69c4d9a213b9108b9f - sha1: 00ea1da4192a2030f9ae023de3b3143ed647bbab - sha256: 6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf - - condition: sel - """) - ) == ['DeviceFileEvents\n| ' - 'where InitiatingProcessId == 1 and InitiatingProcessFolderPath =~ "C:\\\\Path\\\\To\\\\process.exe" and ' - 'FolderPath =~ "C:\\\\Temp\\\\passwords.txt" and RequestAccountName =~ "username" and ' - 'MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" and SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" and ' - 'SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf"'] - - -def test_microsoft_365_defender_registry_event_field_mapping(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: registry_set - product: windows - detection: - sel: - EventType: CreateKey - ProcessId: 1 - Image: C:\\Temp\\reg.exe - TargetObject: HKEY_LOCAL_MACHINE\\SYSTEM\\ControlSet001\\services\\TrustedInstaller - Details: attackiq - User: username - condition: sel - """) - ) == ['DeviceRegistryEvents\n| ' - 'where ActionType =~ "RegistryKeyCreated" and InitiatingProcessId == 1 and ' - 'InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\reg.exe" and ' - 'RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\ControlSet001\\\\services\\\\TrustedInstaller" and ' - 'RegistryValueData =~ "attackiq" and InitiatingProcessAccountName =~ "username"'] - - -def test_microsoft_365_defender_network_connection_field_mapping(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - ProcessId: 1 - Image: C:\\Temp\\notcobaltstrike.exe - User: admin - Protocol: TCP - SourceIp: 127.0.0.1 - SourcePort: 12345 - DestinationIp: 1.2.3.4 - DestinationPort: 50050 - DestinationHostname: notanatp.net - condition: sel - """) - ) == ['DeviceNetworkEvents\n| ' - 'where InitiatingProcessId == 1 and ' - 'InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\notcobaltstrike.exe" and ' - 'InitiatingProcessAccountName =~ "admin" and Protocol =~ "TCP" and LocalIP =~ "127.0.0.1" and ' - 'LocalPort == 12345 and RemoteIP =~ "1.2.3.4" and RemotePort == 50050 and ' - 'RemoteUrl =~ "notanatp.net"'] - - -def test_microsoft_365_defender_network_connection_cidr(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - SourceIp|cidr: '10.10.0.0/24' - DestinationIp|cidr: '10.11.0.0/24' - condition: sel - """) - ) == ['DeviceNetworkEvents\n| ' - 'where ipv4_is_in_range(LocalIP, "10.10.0.0/24") and ipv4_is_in_range(RemoteIP, "10.11.0.0/24")'] - - -def test_microsoft_365_defender_pipeline_registrykey_replacements(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: registry_event - product: windows - detection: - sel1: - RegistryKey: HKLM\\TestKey1 - PreviousRegistryKey: HKLM\\TestKey1 - sel2: - RegistryKey: HKU\\TestKey2 - PreviousRegistryKey: HKU\\TestKey2 - sel3: - RegistryKey: HKLM\\System\\CurrentControlSet\\TestKey3 - PreviousRegistryKey: HKLM\\System\\CurrentControlSet\\TestKey3 - sel4: - RegistryKey: hkcr\\TestKey4 - PreviousRegistryKey: hkcr\\TestKey4 - condition: any of sel* - """) - ) == [ - 'DeviceRegistryEvents\n| where (RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\TestKey1" and ' - 'PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\TestKey1") or ' - '(RegistryKey =~ "HKEY_USERS\\\\TestKey2" and PreviousRegistryKey =~ "HKEY_USERS\\\\TestKey2") or ' - '(RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\CurrentControlSet001\\\\TestKey3" and PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\CurrentControlSet001\\\\TestKey3") or ' - '(RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\CLASSES\\\\TestKey4" and PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\CLASSES\\\\TestKey4")'] - - -def test_microsoft_365_defender_pipeline_registry_actiontype_replacements(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: registry_event - product: windows - detection: - sel1: - ActionType: CreateKey - sel2: - ActionType: DeleteKey - sel3: - ActionType: SetValue - sel4: - ActionType: RenameKey - condition: any of sel* - """) - ) == [ - 'DeviceRegistryEvents\n| ' - 'where ActionType =~ "RegistryKeyCreated" or ' - '(ActionType in~ ("RegistryKeyDeleted", "RegistryValueDeleted")) or ' - 'ActionType =~ "RegistryValueSet" or ' - '(ActionType in~ ("RegistryValueSet", "RegistryKeyCreated"))'] - - -def test_microsoft_365_defender_pipeline_valid_hash_in_list(): - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Hashes: - - MD5=6444f8a34e99b8f7d9647de66aabe516 - - IMPHASH=dfd6aa3f7b2b1035b76b718f1ddc689f - - IMPHASH=1a6cca4d5460b1710a12dea39e4a592c - condition: sel - """) - ) == [ 'DeviceProcessEvents\n| ' - 'where MD5 =~ "6444f8a34e99b8f7d9647de66aabe516"'] - - - -def test_microsoft_365_defender_pipeline_generic_field(): - """Tests""" - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_event - product: windows - detection: - sel1: - CommandLine: whoami - ProcessId: 1 - condition: any of sel* - """) - ) == [ - 'DeviceFileEvents\n| ' - 'where InitiatingProcessCommandLine =~ "whoami" and InitiatingProcessId == 1'] - - -def test_microsoft_365_defender_pipeline_parent_image(): - """Tests ParentImage for non-process-creation rules""" - assert KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_event - product: windows - detection: - sel1: - Image: C:\\Windows\\System32\\whoami.exe - ParentImage: C:\\Windows\\System32\\cmd.exe - condition: any of sel* - """) - ) == [ - 'DeviceFileEvents\n| ' - 'where InitiatingProcessFolderPath =~ "C:\\\\Windows\\\\System32\\\\whoami.exe" and ' - 'InitiatingProcessParentFileName =~ "cmd.exe"'] - - -def test_microsoft_365_defender_pipeline_parent_image_false(): - """Tests passing transfer_parent_image=False to the pipeline""" - with pytest.raises(SigmaTransformationError, - match="Invalid SigmaDetectionItem field name encountered.*DeviceFileEvents"): - KustoBackend(processing_pipeline=microsoft_365_defender_pipeline(transform_parent_image=False)).convert( - SigmaCollection.from_yaml(""" - title: Test - status: test - logsource: - category: file_event - product: windows - detection: - sel1: - Image: C:\\Windows\\System32\\whoami.exe - ParentImage: C:\\Windows\\System32\\cmd.exe - condition: any of sel* - """) - ) - - -def test_microsoft_365_defender_pipeline_unsupported_rule_type(): - with pytest.raises(SigmaTransformationError, - match="Rule category not yet supported by the Microsoft 365 Defender Sigma backend."): - KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: test - status: test - logsource: - category: invalid_category - product: invalid_product - detection: - sel: - field: whatever - condition: sel - """) - ) - - -def test_microsoft_365_defender_pipeline_unsupported_field_process_creation(): - with pytest.raises(SigmaTransformationError, - match="Invalid SigmaDetectionItem field name encountered.*DeviceProcessEvents"): - KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine: whatever - InvalidField: forever - condition: sel - """) - ) - - -def test_microsoft_365_defender_pipeline_unsupported_field_file_event(): - with pytest.raises(SigmaTransformationError, - match="Invalid SigmaDetectionItem field name encountered.*DeviceFileEvents"): - KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: test - status: test - logsource: - category: file_access - product: windows - detection: - sel: - FileName: whatever - InvalidField: forever - condition: sel - """) - ) - - -def test_microsoft_365_defender_pipeline_unsupported_field_image_load(): - with pytest.raises(SigmaTransformationError, - match="Invalid SigmaDetectionItem field name encountered.*DeviceImageLoadEvents"): - KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: test - status: test - logsource: - category: image_load - product: windows - detection: - sel: - CommandLine: whatever - InvalidField: forever - condition: sel - """) - ) - - -def test_microsoft_365_defender_pipeline_unsupported_field_registry_event(): - with pytest.raises(SigmaTransformationError, - match="Invalid SigmaDetectionItem field name encountered.*DeviceRegistryEvents"): - KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: test - status: test - logsource: - category: registry_add - product: windows - detection: - sel: - CommandLine: whatever - InvalidField: forever - condition: sel - """) - ) - - -def test_microsoft_365_defender_pipeline_unsupported_field_network_connection(): - with pytest.raises(SigmaTransformationError, - match="Invalid SigmaDetectionItem field name encountered.*DeviceNetworkEvents"): - KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - CommandLine: whatever - InvalidField: forever - condition: sel - """) - ) - -def test_microsoft_365_defender_pipeline_no_valid_hashes(): - with pytest.raises(InvalidHashAlgorithmError): - KustoBackend(processing_pipeline=microsoft_365_defender_pipeline()).convert( - SigmaCollection.from_yaml(""" - title: test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - Hashes: - - IMPHASH=6444f8a34e99b8f7d9647de66aabe516 - - IMPHASH=dfd6aa3f7b2b1035b76b718f1ddc689f - - IMPHASH=1a6cca4d5460b1710a12dea39e4a592c - condition: sel - """) - ) -