From 697a2d01e7a447ec0655404891a598b6314e79a3 Mon Sep 17 00:00:00 2001 From: Stephen Lincoln Date: Thu, 3 Oct 2024 14:35:10 -0400 Subject: [PATCH] Fixed query table not applied to SigmaRules, added testing for SigmaRules --- sigma/pipelines/azuremonitor/azuremonitor.py | 7 +- sigma/pipelines/kusto_common/conditions.py | 17 + .../pipelines/kusto_common/postprocessing.py | 19 + sigma/pipelines/microsoftxdr/microsoftxdr.py | 4 +- sigma/pipelines/sentinelasim/sentinelasim.py | 6 +- tests/test_pipelines_azuremonitor.py | 423 ++--- tests/test_pipelines_microsoftxdr.py | 1676 ++++++++--------- tests/test_pipelines_sentinelasim.py | 430 ++--- 8 files changed, 1236 insertions(+), 1346 deletions(-) create mode 100644 sigma/pipelines/kusto_common/conditions.py create mode 100644 sigma/pipelines/kusto_common/postprocessing.py diff --git a/sigma/pipelines/azuremonitor/azuremonitor.py b/sigma/pipelines/azuremonitor/azuremonitor.py index fa1298d..6e9ee67 100644 --- a/sigma/pipelines/azuremonitor/azuremonitor.py +++ b/sigma/pipelines/azuremonitor/azuremonitor.py @@ -1,7 +1,9 @@ from typing import Optional +from sigma.pipelines.kusto_common.postprocessing import ( + PrependQueryTablePostprocessingItem, +) from sigma.processing.conditions import ( - # DetectionItemProcessingItemAppliedCondition, ExcludeFieldCondition, IncludeFieldCondition, LogsourceCondition, @@ -16,7 +18,6 @@ ) from ..kusto_common.errors import InvalidFieldTransformation -from ..kusto_common.finalization import QueryTableFinalizer from ..kusto_common.schema import create_schema from ..kusto_common.transformations import ( DynamicFieldMappingTransformation, @@ -215,5 +216,5 @@ def azure_monitor_pipeline(query_table: Optional[str] = None) -> ProcessingPipel priority=10, items=pipeline_items, allowed_backends=frozenset(["kusto"]), - finalizers=[QueryTableFinalizer()], + postprocessing_items=[PrependQueryTablePostprocessingItem], ) diff --git a/sigma/pipelines/kusto_common/conditions.py b/sigma/pipelines/kusto_common/conditions.py new file mode 100644 index 0000000..913f004 --- /dev/null +++ b/sigma/pipelines/kusto_common/conditions.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass +from typing import Union + +from sigma.correlations import SigmaCorrelationRule +from sigma.processing.conditions import RuleProcessingCondition +from sigma.rule import SigmaRule + + +@dataclass +class QueryTableSetCondition(RuleProcessingCondition): + def match( + self, + pipeline: "sigma.processing.pipeline.ProcessingPipeline", # noqa: F821 # type: ignore + rule: Union[SigmaRule, SigmaCorrelationRule], + ) -> bool: + """Match condition on Sigma rule.""" + return pipeline.state.get("query_table", None) is not None diff --git a/sigma/pipelines/kusto_common/postprocessing.py b/sigma/pipelines/kusto_common/postprocessing.py new file mode 100644 index 0000000..bd8609f --- /dev/null +++ b/sigma/pipelines/kusto_common/postprocessing.py @@ -0,0 +1,19 @@ +from sigma.processing.pipeline import QueryPostprocessingItem +from sigma.processing.postprocessing import QueryPostprocessingTransformation +from sigma.rule import SigmaRule + +from ..kusto_common.conditions import QueryTableSetCondition + + +class PrependQueryTablePostprocessingItem(QueryPostprocessingTransformation): + def apply(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: str) -> str: # type: ignore # noqa: F821 + return f"{pipeline.state['query_table']}\n| where {query}" + + +PrependQueryTablePostprocessingItem = QueryPostprocessingItem( + identifier="kusto_prepend_query_table", + transformation=PrependQueryTablePostprocessingItem(), + rule_conditions=[ + QueryTableSetCondition(), + ], +) diff --git a/sigma/pipelines/microsoftxdr/microsoftxdr.py b/sigma/pipelines/microsoftxdr/microsoftxdr.py index d96665f..cb12bb9 100644 --- a/sigma/pipelines/microsoftxdr/microsoftxdr.py +++ b/sigma/pipelines/microsoftxdr/microsoftxdr.py @@ -17,7 +17,7 @@ ) from ..kusto_common.errors import InvalidFieldTransformation -from ..kusto_common.finalization import QueryTableFinalizer +from ..kusto_common.postprocessing import PrependQueryTablePostprocessingItem from ..kusto_common.schema import create_schema from ..kusto_common.transformations import ( DynamicFieldMappingTransformation, @@ -253,5 +253,5 @@ def microsoft_xdr_pipeline( priority=10, items=pipeline_items, allowed_backends=frozenset(["kusto"]), - finalizers=[QueryTableFinalizer()], + postprocessing_items=[PrependQueryTablePostprocessingItem], ) diff --git a/sigma/pipelines/sentinelasim/sentinelasim.py b/sigma/pipelines/sentinelasim/sentinelasim.py index c34421e..0c27fc2 100644 --- a/sigma/pipelines/sentinelasim/sentinelasim.py +++ b/sigma/pipelines/sentinelasim/sentinelasim.py @@ -1,5 +1,8 @@ from typing import Optional +from sigma.pipelines.kusto_common.postprocessing import ( + PrependQueryTablePostprocessingItem, +) from sigma.processing.conditions import ( DetectionItemProcessingItemAppliedCondition, ExcludeFieldCondition, @@ -16,7 +19,6 @@ ) from ..kusto_common.errors import InvalidFieldTransformation -from ..kusto_common.finalization import QueryTableFinalizer from ..kusto_common.schema import create_schema from ..kusto_common.transformations import ( DynamicFieldMappingTransformation, @@ -219,5 +221,5 @@ def sentinel_asim_pipeline( priority=10, items=pipeline_items, allowed_backends=frozenset(["kusto"]), - finalizers=[QueryTableFinalizer()], + postprocessing_items=[PrependQueryTablePostprocessingItem], ) diff --git a/tests/test_pipelines_azuremonitor.py b/tests/test_pipelines_azuremonitor.py index 31264e5..c0d639d 100644 --- a/tests/test_pipelines_azuremonitor.py +++ b/tests/test_pipelines_azuremonitor.py @@ -4,235 +4,214 @@ from sigma.collection import SigmaCollection from sigma.exceptions import SigmaTransformationError from sigma.pipelines.azuremonitor import azure_monitor_pipeline - - -def test_azure_monitor_process_creation_field_mapping(): - assert ( - KustoBackend(processing_pipeline=azure_monitor_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Process Creation - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Image: C:\\Windows\\System32\\cmd.exe - CommandLine: whoami - User: SYSTEM - ProcessId: 1234 - condition: sel - """ - ) - ) - == [ - 'SecurityEvent\n| where NewProcessName =~ "C:\\\\Windows\\\\System32\\\\cmd.exe" and CommandLine =~ "whoami" and SubjectUserName =~ "SYSTEM" and NewProcessId == 1234' - ] - ) - - -def test_azure_monitor_network_connection_field_mapping(): - assert ( - KustoBackend(processing_pipeline=azure_monitor_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Network Connection - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - DestinationIp: 8.8.8.8 - DestinationPort: 53 - SourcePort: 12345 - condition: sel - """ - ) - ) - == ['SecurityEvent\n| where DestinationIp =~ "8.8.8.8" and DestinationPort == 53 and SourcePort == 12345'] - ) - - -def test_azure_monitor_registry_event_field_mapping(): - assert ( - KustoBackend(processing_pipeline=azure_monitor_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Registry Event - status: test - logsource: - category: registry_event - product: windows - detection: - sel: - EventID: 13 - TargetObject: HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run - condition: sel - """ - ) - ) - == [ - 'SecurityEvent\n| where EventID == 13 and ObjectName =~ "HKEY_LOCAL_MACHINE\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run"' - ] - ) - - -def test_azure_monitor_file_event_field_mapping(): - assert ( - KustoBackend(processing_pipeline=azure_monitor_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test File Event - status: test - logsource: - category: file_event - product: windows - detection: - sel: - TargetFilename: C:\\suspicious\\file.exe - Image: C:\\Windows\\System32\\cmd.exe - condition: sel - """ - ) - ) - == [ - 'SecurityEvent\n| where ObjectName =~ "C:\\\\suspicious\\\\file.exe" and NewProcessName =~ "C:\\\\Windows\\\\System32\\\\cmd.exe"' - ] - ) - - -def test_azure_monitor_hashes_transformation(): - assert ( - KustoBackend(processing_pipeline=azure_monitor_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Hashes - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Hashes: - - md5=1234567890abcdef1234567890abcdef - - sha1=1234567890abcdef1234567890abcdef12345678 - - sha256=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef - condition: sel - """ - ) - ) - == [ - 'SecurityEvent\n| where FileHash in~ ("1234567890abcdef1234567890abcdef", "1234567890abcdef1234567890abcdef12345678", "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")' - ] - ) - - -def test_azure_monitor_registry_key_replacement(): - assert ( - KustoBackend(processing_pipeline=azure_monitor_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Registry Key Replacement - status: test - logsource: - category: registry_event - product: windows - detection: - sel: - TargetObject: - - HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run - - HKU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run - - HKCR\\Software\\Microsoft\\Windows\\CurrentVersion\\Run - condition: sel - """ - ) - ) - == [ - 'SecurityEvent\n| where ObjectName in~ ("HKEY_LOCAL_MACHINE\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run", "HKEY_USERS\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run", "HKEY_LOCAL_MACHINE\\\\CLASSES\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run")' - ] - ) - - -def test_azure_monitor_unsupported_category(): +from sigma.rule import SigmaRule + + +@pytest.fixture +def azure_backend(): + return KustoBackend(processing_pipeline=azure_monitor_pipeline()) + + +def test_azure_monitor_process_creation_field_mapping(azure_backend): + yaml_rule = """ + title: Test Process Creation + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + Image: C:\\Windows\\System32\\cmd.exe + CommandLine: whoami + User: SYSTEM + ProcessId: 1234 + condition: sel + """ + expected_result = [ + 'SecurityEvent\n| where NewProcessName =~ "C:\\\\Windows\\\\System32\\\\cmd.exe" and CommandLine =~ "whoami" and SubjectUserName =~ "SYSTEM" and NewProcessId == 1234' + ] + + assert azure_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert azure_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_azure_monitor_network_connection_field_mapping(azure_backend): + yaml_rule = """ + title: Test Network Connection + status: test + logsource: + category: network_connection + product: windows + detection: + sel: + DestinationIp: 8.8.8.8 + DestinationPort: 53 + SourcePort: 12345 + condition: sel + """ + expected_result = [ + 'SecurityEvent\n| where DestinationIp =~ "8.8.8.8" and DestinationPort == 53 and SourcePort == 12345' + ] + + assert azure_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert azure_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_azure_monitor_registry_event_field_mapping(azure_backend): + yaml_rule = """ + title: Test Registry Event + status: test + logsource: + category: registry_event + product: windows + detection: + sel: + EventID: 13 + TargetObject: HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run + condition: sel + """ + expected_result = [ + 'SecurityEvent\n| where EventID == 13 and ObjectName =~ "HKEY_LOCAL_MACHINE\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run"' + ] + + assert azure_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert azure_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_azure_monitor_file_event_field_mapping(azure_backend): + yaml_rule = """ + title: Test File Event + status: test + logsource: + category: file_event + product: windows + detection: + sel: + TargetFilename: C:\\suspicious\\file.exe + Image: C:\\Windows\\System32\\cmd.exe + condition: sel + """ + expected_result = [ + 'SecurityEvent\n| where ObjectName =~ "C:\\\\suspicious\\\\file.exe" and NewProcessName =~ "C:\\\\Windows\\\\System32\\\\cmd.exe"' + ] + + assert azure_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert azure_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_azure_monitor_hashes_transformation(azure_backend): + yaml_rule = """ + title: Test Hashes + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + Hashes: + - md5=1234567890abcdef1234567890abcdef + - sha1=1234567890abcdef1234567890abcdef12345678 + - sha256=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef + condition: sel + """ + expected_result = [ + 'SecurityEvent\n| where FileHash in~ ("1234567890abcdef1234567890abcdef", "1234567890abcdef1234567890abcdef12345678", "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")' + ] + + assert azure_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert azure_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_azure_monitor_registry_key_replacement(azure_backend): + yaml_rule = """ + title: Test Registry Key Replacement + status: test + logsource: + category: registry_event + product: windows + detection: + sel: + TargetObject: + - HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run + - HKU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run + - HKCR\\Software\\Microsoft\\Windows\\CurrentVersion\\Run + condition: sel + """ + expected_result = [ + 'SecurityEvent\n| where ObjectName in~ ("HKEY_LOCAL_MACHINE\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run", "HKEY_USERS\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run", "HKEY_LOCAL_MACHINE\\\\CLASSES\\\\Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run")' + ] + + assert azure_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert azure_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_azure_monitor_unsupported_category(azure_backend): + yaml_rule = """ + title: Test Unsupported Category + status: test + logsource: + category: unsupported_category + product: windows + detection: + sel: + Field: value + condition: sel + """ with pytest.raises(SigmaTransformationError, match="Unable to determine table name for category"): - KustoBackend(processing_pipeline=azure_monitor_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Unsupported Category - status: test - logsource: - category: unsupported_category - product: windows - detection: - sel: - Field: value - condition: sel - """ - ) - ) - - -def test_azure_monitor_invalid_field(): + azure_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_azure_monitor_invalid_field(azure_backend): + yaml_rule = """ + title: Test Invalid Field + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + InvalidField: value + condition: sel + """ with pytest.raises( SigmaTransformationError, match="Invalid SigmaDetectionItem field name encountered.*SecurityEvent" ): - KustoBackend(processing_pipeline=azure_monitor_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Invalid Field - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - InvalidField: value - condition: sel - """ - ) - ) + azure_backend.convert(SigmaCollection.from_yaml(yaml_rule)) def test_azure_monitor_custom_query_table(): - assert ( - KustoBackend(processing_pipeline=azure_monitor_pipeline(query_table="CustomTable")).convert( - SigmaCollection.from_yaml( - """ - title: Test Custom Query Table - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine: whoami - condition: sel - """ - ) - ) - == ['CustomTable\n| where CommandLine =~ "whoami"'] - ) + yaml_rule = """ + title: Test Custom Query Table + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + CommandLine: whoami + condition: sel + """ + expected_result = ['CustomTable\n| where CommandLine =~ "whoami"'] + + custom_backend = KustoBackend(processing_pipeline=azure_monitor_pipeline(query_table="CustomTable")) + assert custom_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert custom_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result def test_azure_monitor_pipeline_custom_table_invalid_category(): - """Tests to ensure custom table names override category table name mappings and field name mappings""" - assert ( - KustoBackend(processing_pipeline=azure_monitor_pipeline(query_table="SecurityEvent")).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - product: windows - category: blah - detection: - sel: - Image: actuallyafileevent.exe - condition: sel - """ - ) - ) - == ["SecurityEvent\n| " 'where NewProcessName =~ "actuallyafileevent.exe"'] - ) + yaml_rule = """ + title: Test + status: test + logsource: + product: windows + category: blah + detection: + sel: + Image: actuallyafileevent.exe + condition: sel + """ + expected_result = ["SecurityEvent\n| " 'where NewProcessName =~ "actuallyafileevent.exe"'] + + custom_backend = KustoBackend(processing_pipeline=azure_monitor_pipeline(query_table="SecurityEvent")) + assert custom_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert custom_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result diff --git a/tests/test_pipelines_microsoftxdr.py b/tests/test_pipelines_microsoftxdr.py index 8e6a6c3..fb28b87 100644 --- a/tests/test_pipelines_microsoftxdr.py +++ b/tests/test_pipelines_microsoftxdr.py @@ -6,935 +6,821 @@ from sigma.pipelines.kusto_common.errors import InvalidHashAlgorithmError from sigma.pipelines.microsoft365defender import microsoft_365_defender_pipeline from sigma.pipelines.microsoftxdr import microsoft_xdr_pipeline +from sigma.rule import SigmaRule -def test_microsoft_xdr_pipeline_alias(): - assert microsoft_xdr_pipeline() == microsoft_365_defender_pipeline() - - -def test_microsoft_xdr_username_transformation(): - """Tests splitting username up into different fields if it includes a domain""" - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel1: - CommandLine: command1 - AccountName: username1 - sel2: - CommandLine: command2 - AccountName: domain2\\username2 - sel3: - CommandLine: command3 - InitiatingProcessAccountName: - - username3 - - domain4\\username4 - sel4: - AccountName: username5 - condition: any of sel* - """ - ) - ) - == [ - "DeviceProcessEvents\n| " - 'where (ProcessCommandLine =~ "command1" and AccountName =~ "username1") or ' - '(ProcessCommandLine =~ "command2" and (AccountName =~ "username2" and AccountDomain =~ "domain2")) or ' - '(ProcessCommandLine =~ "command3" and (InitiatingProcessAccountName =~ "username3" or ' - '(InitiatingProcessAccountName =~ "username4" and InitiatingProcessAccountDomain =~ "domain4"))) or ' - 'AccountName =~ "username5"' - ] - ) - - -def test_microsoft_xdr_hashes_values_transformation(): - """Test for getting hash algo/value from Hashes field and creating new detection items from them""" - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel1: - Hashes: - - md5|e708864855f3bb69c4d9a213b9108b9f - - sha1|00ea1da4192a2030f9ae023de3b3143ed647bbab - - sha256|6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf - sel2: - Hashes: - - 0b49939d6415354c950b142a0b1e696a - - 4b2b79b6f371ca18f1216461cffeaddf6848a50e - - 8f16f88cfa1cf0d17c75403aa9614d806ebc00419763e0ecac3860decbcd9988 - - invalidhashvalue - condition: any of sel* - """ - ) - ) - == [ - "DeviceProcessEvents\n" - '| where (MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" or SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" ' - 'or SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf") or ' - '(MD5 =~ "0b49939d6415354c950b142a0b1e696a" or SHA1 =~ "4b2b79b6f371ca18f1216461cffeaddf6848a50e" or ' - 'SHA256 =~ "8f16f88cfa1cf0d17c75403aa9614d806ebc00419763e0ecac3860decbcd9988")' - ] - ) - - -def test_microsoft_xdr_process_creation_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine: val1 - Image: val2 - condition: sel - """ - ) - ) - == ['DeviceProcessEvents\n| where ProcessCommandLine =~ "val1" and FolderPath =~ "val2"'] - ) - - -def test_microsoft_xdr_image_load_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: image_load - product: windows - detection: - sel: - ImageLoaded: val1 - sha1: val2 - condition: sel - """ - ) - ) - == ['DeviceImageLoadEvents\n| where FolderPath =~ "val1" and SHA1 =~ "val2"'] - ) - - -def test_microsoft_xdr_file_access_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_access - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """ - ) - ) - == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - ) - +@pytest.fixture +def xdr_backend(): + return KustoBackend(processing_pipeline=microsoft_xdr_pipeline()) -def test_microsoft_xdr_file_change_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_change - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """ - ) - ) - == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - ) - - -def test_microsoft_xdr_file_delete_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_delete - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """ - ) - ) - == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - ) - - -def test_microsoft_xdr_file_event_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_change - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """ - ) - ) - == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - ) - - -def test_microsoft_xdr_file_rename_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_rename - product: windows - detection: - sel: - TargetFilename: val1 - Image: val2 - condition: sel - """ - ) - ) - == ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] - ) - - -def test_microsoft_xdr_registry_add_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: registry_add - product: windows - detection: - sel: - Image: val1 - TargetObject: val2 - condition: sel - """ - ) - ) - == ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] - ) - - -def test_microsoft_xdr_registry_delete_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: registry_delete - product: windows - detection: - sel: - Image: val1 - TargetObject: val2 - condition: sel - """ - ) - ) - == ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] - ) - - -def test_microsoft_xdr_registry_event_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: registry_event - product: windows - detection: - sel: - Image: val1 - TargetObject: val2 - condition: sel - """ - ) - ) - == ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] - ) - - -def test_microsoft_xdr_registry_set_simple(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: registry_set - product: windows - detection: - sel: - Image: val1 - TargetObject: val2 - condition: sel - """ - ) - ) - == ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] - ) - - -def test_microsoft_xdr_process_creation_field_mapping(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Image: C:\\Path\\to\\notmalware.exe - FileVersion: 1 - Description: A Description - Product: pySigma - Company: AttackIQ - OriginalFileName: malware.exe - ProcessId: 2 - CommandLine: definitely not malware - User: heyitsmeyourbrother - IntegrityLevel: 1 - sha1: a123123123 - sha256: a123123123 - md5: a123123123 - ParentProcessId: 1 - ParentImage: C:\\Windows\\Temp\\freemoney.pdf - ParentCommandLine: freemoney.pdf test exe please ignore - ParentUser: heyitsmeyourparent - - condition: sel - """ - ) - ) - == [ - "DeviceProcessEvents\n| " - 'where FolderPath =~ "C:\\\\Path\\\\to\\\\notmalware.exe" and ' - "ProcessVersionInfoProductVersion == 1 and " - 'ProcessVersionInfoFileDescription =~ "A Description" and ' - 'ProcessVersionInfoProductName =~ "pySigma" and ' - 'ProcessVersionInfoCompanyName =~ "AttackIQ" and ' - 'ProcessVersionInfoOriginalFileName =~ "malware.exe" and ' - "ProcessId == 2 and " - 'ProcessCommandLine =~ "definitely not malware" and ' - 'AccountName =~ "heyitsmeyourbrother" and ' - "ProcessIntegrityLevel == 1 and " - 'SHA1 =~ "a123123123" and ' - 'SHA256 =~ "a123123123" and ' - 'MD5 =~ "a123123123" and ' - "InitiatingProcessId == 1 and " - 'InitiatingProcessFolderPath =~ "C:\\\\Windows\\\\Temp\\\\freemoney.pdf" and ' - 'InitiatingProcessCommandLine =~ "freemoney.pdf test exe please ignore" and ' - 'InitiatingProcessAccountName =~ "heyitsmeyourparent"' - ] - ) - - -def test_microsoft_xdr_image_load_field_mapping(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: image_load - product: windows - detection: - sel: - ProcessId: 1 - Image: C:\\Temp\\notmalware.exe - ImageLoaded: C:\\Temp\\definitelynotmalware.exe - FileVersion: 1 - Description: A Description - Product: A Product - Company: AttackIQ - OriginalFileName: freemoney.pdf.exe - md5: e708864855f3bb69c4d9a213b9108b9f - sha1: 00ea1da4192a2030f9ae023de3b3143ed647bbab - sha256: 6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf - User: username - - condition: sel - """ - ) - ) - == [ - "DeviceImageLoadEvents\n| " - 'where InitiatingProcessId == 1 and InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\notmalware.exe" and ' - 'FolderPath =~ "C:\\\\Temp\\\\definitelynotmalware.exe" and InitiatingProcessVersionInfoProductVersion == 1 ' - 'and InitiatingProcessVersionInfoFileDescription =~ "A Description" and ' - 'InitiatingProcessVersionInfoProductName =~ "A Product" and ' - 'InitiatingProcessVersionInfoCompanyName =~ "AttackIQ" and ' - 'InitiatingProcessVersionInfoOriginalFileName =~ "freemoney.pdf.exe" and ' - 'MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" and SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" and ' - 'SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf" and ' - 'InitiatingProcessAccountName =~ "username"' - ] - ) - -def test_microsoft_xdr_file_event_field_mapping(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_event - product: windows - detection: - sel: - ProcessId: 1 - Image: C:\\Path\\To\\process.exe - TargetFilename: C:\\Temp\\passwords.txt - User: username - md5: e708864855f3bb69c4d9a213b9108b9f - sha1: 00ea1da4192a2030f9ae023de3b3143ed647bbab - sha256: 6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf - - condition: sel - """ - ) - ) - == [ - "DeviceFileEvents\n| " - 'where InitiatingProcessId == 1 and InitiatingProcessFolderPath =~ "C:\\\\Path\\\\To\\\\process.exe" and ' - 'FolderPath =~ "C:\\\\Temp\\\\passwords.txt" and RequestAccountName =~ "username" and ' - 'MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" and SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" and ' - 'SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf"' - ] - ) - - -def test_microsoft_xdr_registry_event_field_mapping(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: registry_set - product: windows - detection: - sel: - EventType: CreateKey - ProcessId: 1 - Image: C:\\Temp\\reg.exe - TargetObject: HKEY_LOCAL_MACHINE\\SYSTEM\\ControlSet001\\services\\TrustedInstaller - Details: attackiq - User: username - condition: sel - """ - ) - ) - == [ - "DeviceRegistryEvents\n| " - 'where ActionType =~ "RegistryKeyCreated" and InitiatingProcessId == 1 and ' - 'InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\reg.exe" and ' - 'RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\ControlSet001\\\\services\\\\TrustedInstaller" and ' - 'RegistryValueData =~ "attackiq" and InitiatingProcessAccountName =~ "username"' - ] - ) - - -def test_microsoft_xdr_network_connection_field_mapping(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - ProcessId: 1 - Image: C:\\Temp\\notcobaltstrike.exe - User: admin - Protocol: TCP - SourceIp: 127.0.0.1 - SourcePort: 12345 - DestinationIp: 1.2.3.4 - DestinationPort: 50050 - DestinationHostname: notanatp.net - condition: sel - """ - ) - ) - == [ - "DeviceNetworkEvents\n| " - "where InitiatingProcessId == 1 and " - 'InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\notcobaltstrike.exe" and ' - 'InitiatingProcessAccountName =~ "admin" and Protocol =~ "TCP" and LocalIP =~ "127.0.0.1" and ' - 'LocalPort == 12345 and RemoteIP =~ "1.2.3.4" and RemotePort == 50050 and ' - 'RemoteUrl =~ "notanatp.net"' - ] - ) - - -def test_microsoft_xdr_network_connection_cidr(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - SourceIp|cidr: '10.10.0.0/24' - DestinationIp|cidr: '10.11.0.0/24' - condition: sel - """ - ) - ) - == [ - "DeviceNetworkEvents\n| " - 'where ipv4_is_in_range(LocalIP, "10.10.0.0/24") and ipv4_is_in_range(RemoteIP, "10.11.0.0/24")' - ] - ) - - -def test_microsoft_xdr_pipeline_registrykey_replacements(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: registry_event - product: windows - detection: - sel1: - RegistryKey: HKLM\\TestKey1 - PreviousRegistryKey: HKLM\\TestKey1 - sel2: - RegistryKey: HKU\\TestKey2 - PreviousRegistryKey: HKU\\TestKey2 - sel3: - RegistryKey: HKLM\\System\\CurrentControlSet\\TestKey3 - PreviousRegistryKey: HKLM\\System\\CurrentControlSet\\TestKey3 - sel4: - RegistryKey: hkcr\\TestKey4 - PreviousRegistryKey: hkcr\\TestKey4 - condition: any of sel* - """ - ) - ) - == [ - 'DeviceRegistryEvents\n| where (RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\TestKey1" and ' - 'PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\TestKey1") or ' - '(RegistryKey =~ "HKEY_USERS\\\\TestKey2" and PreviousRegistryKey =~ "HKEY_USERS\\\\TestKey2") or ' - '(RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\CurrentControlSet001\\\\TestKey3" and PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\CurrentControlSet001\\\\TestKey3") or ' - '(RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\CLASSES\\\\TestKey4" and PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\CLASSES\\\\TestKey4")' - ] - ) - - -def test_microsoft_xdr_pipeline_registry_actiontype_replacements(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: registry_event - product: windows - detection: - sel1: - ActionType: CreateKey - sel2: - ActionType: DeleteKey - sel3: - ActionType: SetValue - sel4: - ActionType: RenameKey - condition: any of sel* - """ - ) - ) - == [ - "DeviceRegistryEvents\n| " - 'where ActionType =~ "RegistryKeyCreated" or ' - '(ActionType in~ ("RegistryKeyDeleted", "RegistryValueDeleted")) or ' - 'ActionType =~ "RegistryValueSet" or ' - '(ActionType in~ ("RegistryValueSet", "RegistryKeyCreated"))' - ] - ) - - -def test_microsoft_xdr_pipeline_valid_hash_in_list(): - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Hashes: - - MD5=6444f8a34e99b8f7d9647de66aabe516 - - IMPHASH=dfd6aa3f7b2b1035b76b718f1ddc689f - - IMPHASH=1a6cca4d5460b1710a12dea39e4a592c - condition: sel - """ - ) - ) - == ["DeviceProcessEvents\n| " 'where MD5 =~ "6444f8a34e99b8f7d9647de66aabe516"'] - ) - - -def test_microsoft_xdr_pipeline_generic_field(): - """Tests""" - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_event - product: windows - detection: - sel1: - CommandLine: whoami - ProcessId: 1 - condition: any of sel* - """ - ) - ) - == ["DeviceFileEvents\n| " 'where InitiatingProcessCommandLine =~ "whoami" and InitiatingProcessId == 1'] - ) - - -def test_microsoft_xdr_pipeline_parent_image(): - """Tests ParentImage for non-process-creation rules""" - assert ( - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_event - product: windows - detection: - sel1: - Image: C:\\Windows\\System32\\whoami.exe - ParentImage: C:\\Windows\\System32\\cmd.exe - condition: any of sel* - """ - ) - ) - == [ - "DeviceFileEvents\n| " - 'where InitiatingProcessFolderPath =~ "C:\\\\Windows\\\\System32\\\\whoami.exe" and ' - 'InitiatingProcessParentFileName =~ "cmd.exe"' - ] - ) +def test_microsoft_xdr_pipeline_alias(): + assert microsoft_xdr_pipeline() == microsoft_365_defender_pipeline() -def test_microsoft_xdr_pipeline_parent_image_false(): - """Tests passing transfer_parent_image=False to the pipeline""" +def test_microsoft_xdr_username_transformation(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: process_creation + product: windows + detection: + sel1: + CommandLine: command1 + AccountName: username1 + sel2: + CommandLine: command2 + AccountName: domain2\\username2 + sel3: + CommandLine: command3 + InitiatingProcessAccountName: + - username3 + - domain4\\username4 + sel4: + AccountName: username5 + condition: any of sel* + """ + expected_result = [ + "DeviceProcessEvents\n| " + 'where (ProcessCommandLine =~ "command1" and AccountName =~ "username1") or ' + '(ProcessCommandLine =~ "command2" and (AccountName =~ "username2" and AccountDomain =~ "domain2")) or ' + '(ProcessCommandLine =~ "command3" and (InitiatingProcessAccountName =~ "username3" or ' + '(InitiatingProcessAccountName =~ "username4" and InitiatingProcessAccountDomain =~ "domain4"))) or ' + 'AccountName =~ "username5"' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_hashes_values_transformation(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: process_creation + product: windows + detection: + sel1: + Hashes: + - md5|e708864855f3bb69c4d9a213b9108b9f + - sha1|00ea1da4192a2030f9ae023de3b3143ed647bbab + - sha256|6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf + sel2: + Hashes: + - 0b49939d6415354c950b142a0b1e696a + - 4b2b79b6f371ca18f1216461cffeaddf6848a50e + - 8f16f88cfa1cf0d17c75403aa9614d806ebc00419763e0ecac3860decbcd9988 + - invalidhashvalue + condition: any of sel* + """ + expected_result = [ + "DeviceProcessEvents\n" + '| where (MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" or SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" ' + 'or SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf") or ' + '(MD5 =~ "0b49939d6415354c950b142a0b1e696a" or SHA1 =~ "4b2b79b6f371ca18f1216461cffeaddf6848a50e" or ' + 'SHA256 =~ "8f16f88cfa1cf0d17c75403aa9614d806ebc00419763e0ecac3860decbcd9988")' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_process_creation_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + CommandLine: val1 + Image: val2 + condition: sel + """ + expected_result = ['DeviceProcessEvents\n| where ProcessCommandLine =~ "val1" and FolderPath =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_image_load_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: image_load + product: windows + detection: + sel: + ImageLoaded: val1 + sha1: val2 + condition: sel + """ + expected_result = ['DeviceImageLoadEvents\n| where FolderPath =~ "val1" and SHA1 =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_file_access_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_access + product: windows + detection: + sel: + TargetFilename: val1 + Image: val2 + condition: sel + """ + expected_result = ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_file_change_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_change + product: windows + detection: + sel: + TargetFilename: val1 + Image: val2 + condition: sel + """ + expected_result = ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_file_delete_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_delete + product: windows + detection: + sel: + TargetFilename: val1 + Image: val2 + condition: sel + """ + expected_result = ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_file_event_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_change + product: windows + detection: + sel: + TargetFilename: val1 + Image: val2 + condition: sel + """ + expected_result = ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_file_rename_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_rename + product: windows + detection: + sel: + TargetFilename: val1 + Image: val2 + condition: sel + """ + expected_result = ['DeviceFileEvents\n| where FolderPath =~ "val1" and InitiatingProcessFolderPath =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_registry_add_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: registry_add + product: windows + detection: + sel: + Image: val1 + TargetObject: val2 + condition: sel + """ + expected_result = ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_registry_delete_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: registry_delete + product: windows + detection: + sel: + Image: val1 + TargetObject: val2 + condition: sel + """ + expected_result = ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_registry_event_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: registry_event + product: windows + detection: + sel: + Image: val1 + TargetObject: val2 + condition: sel + """ + expected_result = ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_registry_set_simple(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: registry_set + product: windows + detection: + sel: + Image: val1 + TargetObject: val2 + condition: sel + """ + expected_result = ['DeviceRegistryEvents\n| where InitiatingProcessFolderPath =~ "val1" and RegistryKey =~ "val2"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_process_creation_field_mapping(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + Image: C:\\Path\\to\\notmalware.exe + FileVersion: 1 + Description: A Description + Product: pySigma + Company: AttackIQ + OriginalFileName: malware.exe + ProcessId: 2 + CommandLine: definitely not malware + User: heyitsmeyourbrother + IntegrityLevel: 1 + sha1: a123123123 + sha256: a123123123 + md5: a123123123 + ParentProcessId: 1 + ParentImage: C:\\Windows\\Temp\\freemoney.pdf + ParentCommandLine: freemoney.pdf test exe please ignore + ParentUser: heyitsmeyourparent + condition: sel + """ + expected_result = [ + "DeviceProcessEvents\n| " + 'where FolderPath =~ "C:\\\\Path\\\\to\\\\notmalware.exe" and ' + "ProcessVersionInfoProductVersion == 1 and " + 'ProcessVersionInfoFileDescription =~ "A Description" and ' + 'ProcessVersionInfoProductName =~ "pySigma" and ' + 'ProcessVersionInfoCompanyName =~ "AttackIQ" and ' + 'ProcessVersionInfoOriginalFileName =~ "malware.exe" and ' + "ProcessId == 2 and " + 'ProcessCommandLine =~ "definitely not malware" and ' + 'AccountName =~ "heyitsmeyourbrother" and ' + "ProcessIntegrityLevel == 1 and " + 'SHA1 =~ "a123123123" and ' + 'SHA256 =~ "a123123123" and ' + 'MD5 =~ "a123123123" and ' + "InitiatingProcessId == 1 and " + 'InitiatingProcessFolderPath =~ "C:\\\\Windows\\\\Temp\\\\freemoney.pdf" and ' + 'InitiatingProcessCommandLine =~ "freemoney.pdf test exe please ignore" and ' + 'InitiatingProcessAccountName =~ "heyitsmeyourparent"' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_image_load_field_mapping(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: image_load + product: windows + detection: + sel: + ProcessId: 1 + Image: C:\\Temp\\notmalware.exe + ImageLoaded: C:\\Temp\\definitelynotmalware.exe + FileVersion: 1 + Description: A Description + Product: A Product + Company: AttackIQ + OriginalFileName: freemoney.pdf.exe + md5: e708864855f3bb69c4d9a213b9108b9f + sha1: 00ea1da4192a2030f9ae023de3b3143ed647bbab + sha256: 6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf + User: username + condition: sel + """ + expected_result = [ + "DeviceImageLoadEvents\n| " + 'where InitiatingProcessId == 1 and InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\notmalware.exe" and ' + 'FolderPath =~ "C:\\\\Temp\\\\definitelynotmalware.exe" and InitiatingProcessVersionInfoProductVersion == 1 ' + 'and InitiatingProcessVersionInfoFileDescription =~ "A Description" and ' + 'InitiatingProcessVersionInfoProductName =~ "A Product" and ' + 'InitiatingProcessVersionInfoCompanyName =~ "AttackIQ" and ' + 'InitiatingProcessVersionInfoOriginalFileName =~ "freemoney.pdf.exe" and ' + 'MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" and SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" and ' + 'SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf" and ' + 'InitiatingProcessAccountName =~ "username"' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_file_event_field_mapping(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_event + product: windows + detection: + sel: + ProcessId: 1 + Image: C:\\Path\\To\\process.exe + TargetFilename: C:\\Temp\\passwords.txt + User: username + md5: e708864855f3bb69c4d9a213b9108b9f + sha1: 00ea1da4192a2030f9ae023de3b3143ed647bbab + sha256: 6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf + condition: sel + """ + expected_result = [ + "DeviceFileEvents\n| " + 'where InitiatingProcessId == 1 and InitiatingProcessFolderPath =~ "C:\\\\Path\\\\To\\\\process.exe" and ' + 'FolderPath =~ "C:\\\\Temp\\\\passwords.txt" and RequestAccountName =~ "username" and ' + 'MD5 =~ "e708864855f3bb69c4d9a213b9108b9f" and SHA1 =~ "00ea1da4192a2030f9ae023de3b3143ed647bbab" and ' + 'SHA256 =~ "6bbb0da1891646e58eb3e6a63af3a6fc3c8eb5a0d44824cba581d2e14a0450cf"' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_registry_event_field_mapping(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: registry_set + product: windows + detection: + sel: + EventType: CreateKey + ProcessId: 1 + Image: C:\\Temp\\reg.exe + TargetObject: HKEY_LOCAL_MACHINE\\SYSTEM\\ControlSet001\\services\\TrustedInstaller + Details: attackiq + User: username + condition: sel + """ + expected_result = [ + "DeviceRegistryEvents\n| " + 'where ActionType =~ "RegistryKeyCreated" and InitiatingProcessId == 1 and ' + 'InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\reg.exe" and ' + 'RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\ControlSet001\\\\services\\\\TrustedInstaller" and ' + 'RegistryValueData =~ "attackiq" and InitiatingProcessAccountName =~ "username"' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_network_connection_field_mapping(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: network_connection + product: windows + detection: + sel: + ProcessId: 1 + Image: C:\\Temp\\notcobaltstrike.exe + User: admin + Protocol: TCP + SourceIp: 127.0.0.1 + SourcePort: 12345 + DestinationIp: 1.2.3.4 + DestinationPort: 50050 + DestinationHostname: notanatp.net + condition: sel + """ + expected_result = [ + "DeviceNetworkEvents\n| " + "where InitiatingProcessId == 1 and " + 'InitiatingProcessFolderPath =~ "C:\\\\Temp\\\\notcobaltstrike.exe" and ' + 'InitiatingProcessAccountName =~ "admin" and Protocol =~ "TCP" and LocalIP =~ "127.0.0.1" and ' + 'LocalPort == 12345 and RemoteIP =~ "1.2.3.4" and RemotePort == 50050 and ' + 'RemoteUrl =~ "notanatp.net"' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_network_connection_cidr(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: network_connection + product: windows + detection: + sel: + SourceIp|cidr: '10.10.0.0/24' + DestinationIp|cidr: '10.11.0.0/24' + condition: sel + """ + expected_result = [ + "DeviceNetworkEvents\n| " + 'where ipv4_is_in_range(LocalIP, "10.10.0.0/24") and ipv4_is_in_range(RemoteIP, "10.11.0.0/24")' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_pipeline_registrykey_replacements(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: registry_event + product: windows + detection: + sel1: + RegistryKey: HKLM\\TestKey1 + PreviousRegistryKey: HKLM\\TestKey1 + sel2: + RegistryKey: HKU\\TestKey2 + PreviousRegistryKey: HKU\\TestKey2 + sel3: + RegistryKey: HKLM\\System\\CurrentControlSet\\TestKey3 + PreviousRegistryKey: HKLM\\System\\CurrentControlSet\\TestKey3 + sel4: + RegistryKey: hkcr\\TestKey4 + PreviousRegistryKey: hkcr\\TestKey4 + condition: any of sel* + """ + expected_result = [ + 'DeviceRegistryEvents\n| where (RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\TestKey1" and ' + 'PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\TestKey1") or ' + '(RegistryKey =~ "HKEY_USERS\\\\TestKey2" and PreviousRegistryKey =~ "HKEY_USERS\\\\TestKey2") or ' + '(RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\CurrentControlSet001\\\\TestKey3" and PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SYSTEM\\\\CurrentControlSet001\\\\TestKey3") or ' + '(RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\CLASSES\\\\TestKey4" and PreviousRegistryKey =~ "HKEY_LOCAL_MACHINE\\\\CLASSES\\\\TestKey4")' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_pipeline_registry_actiontype_replacements(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: registry_event + product: windows + detection: + sel1: + ActionType: CreateKey + sel2: + ActionType: DeleteKey + sel3: + ActionType: SetValue + sel4: + ActionType: RenameKey + condition: any of sel* + """ + expected_result = [ + "DeviceRegistryEvents\n| " + 'where ActionType =~ "RegistryKeyCreated" or ' + '(ActionType in~ ("RegistryKeyDeleted", "RegistryValueDeleted")) or ' + 'ActionType =~ "RegistryValueSet" or ' + '(ActionType in~ ("RegistryValueSet", "RegistryKeyCreated"))' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_pipeline_valid_hash_in_list(xdr_backend): + yaml_rule = """ + title: test + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + Hashes: + - MD5=6444f8a34e99b8f7d9647de66aabe516 + - IMPHASH=dfd6aa3f7b2b1035b76b718f1ddc689f + - IMPHASH=1a6cca4d5460b1710a12dea39e4a592c + condition: sel + """ + expected_result = ["DeviceProcessEvents\n| " 'where MD5 =~ "6444f8a34e99b8f7d9647de66aabe516"'] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_pipeline_generic_field(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_event + product: windows + detection: + sel1: + CommandLine: whoami + ProcessId: 1 + condition: any of sel* + """ + expected_result = [ + "DeviceFileEvents\n| " 'where InitiatingProcessCommandLine =~ "whoami" and InitiatingProcessId == 1' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_pipeline_parent_image(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_event + product: windows + detection: + sel1: + Image: C:\\Windows\\System32\\whoami.exe + ParentImage: C:\\Windows\\System32\\cmd.exe + condition: any of sel* + """ + expected_result = [ + "DeviceFileEvents\n| " + 'where InitiatingProcessFolderPath =~ "C:\\\\Windows\\\\System32\\\\whoami.exe" and ' + 'InitiatingProcessParentFileName =~ "cmd.exe"' + ] + + assert xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert xdr_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_microsoft_xdr_pipeline_parent_image_false(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: file_event + product: windows + detection: + sel1: + Image: C:\\Windows\\System32\\whoami.exe + ParentImage: C:\\Windows\\System32\\cmd.exe + condition: any of sel* + """ with pytest.raises( SigmaTransformationError, match="Invalid SigmaDetectionItem field name encountered.*DeviceFileEvents" ): KustoBackend(processing_pipeline=microsoft_xdr_pipeline(transform_parent_image=False)).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: file_event - product: windows - detection: - sel1: - Image: C:\\Windows\\System32\\whoami.exe - ParentImage: C:\\Windows\\System32\\cmd.exe - condition: any of sel* - """ - ) - ) - - -def test_microsoft_xdr_pipeline_unsupported_rule_type(): + SigmaCollection.from_yaml(yaml_rule) + ) + + +def test_microsoft_xdr_pipeline_unsupported_rule_type(xdr_backend): + yaml_rule = """ + title: test + status: test + logsource: + category: invalid_category + product: invalid_product + detection: + sel: + field: whatever + condition: sel + """ with pytest.raises(SigmaTransformationError, match="Unable to determine table name for category"): - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: test - status: test - logsource: - category: invalid_category - product: invalid_product - detection: - sel: - field: whatever - condition: sel - """ - ) - ) - - -def test_microsoft_xdr_pipeline_unsupported_field_process_creation(): + xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_microsoft_xdr_pipeline_unsupported_field_process_creation(xdr_backend): + yaml_rule = """ + title: test + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + CommandLine: whatever + InvalidField: forever + condition: sel + """ with pytest.raises( SigmaTransformationError, match="Invalid SigmaDetectionItem field name encountered.*DeviceProcessEvents" ): - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - CommandLine: whatever - InvalidField: forever - condition: sel - """ - ) - ) - - -def test_microsoft_xdr_pipeline_unsupported_field_file_event(): + xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_microsoft_xdr_pipeline_unsupported_field_file_event(xdr_backend): + yaml_rule = """ + title: test + status: test + logsource: + category: file_access + product: windows + detection: + sel: + FileName: whatever + InvalidField: forever + condition: sel + """ with pytest.raises( SigmaTransformationError, match="Invalid SigmaDetectionItem field name encountered.*DeviceFileEvents" ): - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: test - status: test - logsource: - category: file_access - product: windows - detection: - sel: - FileName: whatever - InvalidField: forever - condition: sel - """ - ) - ) - - -def test_microsoft_xdr_pipeline_unsupported_field_image_load(): + xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_microsoft_xdr_pipeline_unsupported_field_image_load(xdr_backend): + yaml_rule = """ + title: test + status: test + logsource: + category: image_load + product: windows + detection: + sel: + CommandLine: whatever + InvalidField: forever + condition: sel + """ with pytest.raises( SigmaTransformationError, match="Invalid SigmaDetectionItem field name encountered.*DeviceImageLoadEvents" ): - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: test - status: test - logsource: - category: image_load - product: windows - detection: - sel: - CommandLine: whatever - InvalidField: forever - condition: sel - """ - ) - ) - - -def test_microsoft_xdr_pipeline_unsupported_field_registry_event(): + xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_microsoft_xdr_pipeline_unsupported_field_registry_event(xdr_backend): + yaml_rule = """ + title: test + status: test + logsource: + category: registry_add + product: windows + detection: + sel: + CommandLine: whatever + InvalidField: forever + condition: sel + """ with pytest.raises( SigmaTransformationError, match="Invalid SigmaDetectionItem field name encountered.*DeviceRegistryEvents" ): - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: test - status: test - logsource: - category: registry_add - product: windows - detection: - sel: - CommandLine: whatever - InvalidField: forever - condition: sel - """ - ) - ) - - -def test_microsoft_xdr_pipeline_unsupported_field_network_connection(): + xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_microsoft_xdr_pipeline_unsupported_field_network_connection(xdr_backend): + yaml_rule = """ + title: test + status: test + logsource: + category: network_connection + product: windows + detection: + sel: + CommandLine: whatever + InvalidField: forever + condition: sel + """ with pytest.raises( SigmaTransformationError, match="Invalid SigmaDetectionItem field name encountered.*DeviceNetworkEvents" ): - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - CommandLine: whatever - InvalidField: forever - condition: sel - """ - ) - ) - - -def test_microsoft_xdr_pipeline_no_valid_hashes(): + xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_microsoft_xdr_pipeline_no_valid_hashes(xdr_backend): + yaml_rule = """ + title: test + status: test + logsource: + category: network_connection + product: windows + detection: + sel: + Hashes: + - IMPHASH=6444f8a34e99b8f7d9647de66aabe516 + - IMPHASH=dfd6aa3f7b2b1035b76b718f1ddc689f + - IMPHASH=1a6cca4d5460b1710a12dea39e4a592c + condition: sel + """ with pytest.raises(InvalidHashAlgorithmError): - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: test - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - Hashes: - - IMPHASH=6444f8a34e99b8f7d9647de66aabe516 - - IMPHASH=dfd6aa3f7b2b1035b76b718f1ddc689f - - IMPHASH=1a6cca4d5460b1710a12dea39e4a592c - condition: sel - """ - ) - ) + xdr_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_microsoft_xdr_pipeline_custom_table(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + Image: actuallyafileevent.exe + condition: sel + """ + expected_result = ["DeviceFileEvents\n| " 'where InitiatingProcessFolderPath =~ "actuallyafileevent.exe"'] - -def test_microsoft_xdr_pipeline_custom_table(): - """Tests to ensure custom table names override category table name mappings and field name mappings""" assert ( KustoBackend(processing_pipeline=microsoft_xdr_pipeline(query_table="DeviceFileEvents")).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Image: actuallyafileevent.exe - condition: sel - """ - ) + SigmaCollection.from_yaml(yaml_rule) ) - == ["DeviceFileEvents\n| " 'where InitiatingProcessFolderPath =~ "actuallyafileevent.exe"'] + == expected_result ) - -def test_microsoft_xdr_pipeline_custom_table_invalid_category(): - """Tests to ensure custom table names override category table name mappings and field name mappings""" + +def test_microsoft_xdr_pipeline_custom_table_invalid_category(xdr_backend): + yaml_rule = """ + title: Test + status: test + logsource: + product: windows + detection: + sel: + Image: actuallyafileevent.exe + condition: sel + """ + expected_result = ["DeviceFileEvents\n| " 'where InitiatingProcessFolderPath =~ "actuallyafileevent.exe"'] + assert ( KustoBackend(processing_pipeline=microsoft_xdr_pipeline(query_table="DeviceFileEvents")).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - product: windows - detection: - sel: - Image: actuallyafileevent.exe - condition: sel - """ - ) + SigmaCollection.from_yaml(yaml_rule) ) - == ["DeviceFileEvents\n| " 'where InitiatingProcessFolderPath =~ "actuallyafileevent.exe"'] + == expected_result ) - - -def test_microsoft_xdr_pipeline_custom_table_invalid_category_no_table(): - """Tests to ensure custom table names override category table name mappings and field name mappings""" - with pytest.raises(SigmaTransformationError, match="Unable to determine table name for category"): - KustoBackend(processing_pipeline=microsoft_xdr_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test - status: test - logsource: - product: windows - detection: - sel: - Image: actuallyafileevent.exe - condition: sel - """ - ) - ) - \ No newline at end of file diff --git a/tests/test_pipelines_sentinelasim.py b/tests/test_pipelines_sentinelasim.py index 6be3997..bb71009 100644 --- a/tests/test_pipelines_sentinelasim.py +++ b/tests/test_pipelines_sentinelasim.py @@ -4,234 +4,220 @@ from sigma.collection import SigmaCollection from sigma.exceptions import SigmaTransformationError from sigma.pipelines.sentinelasim import sentinel_asim_pipeline - - -def test_sentinel_asim_process_creation_field_mapping(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Process Creation - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Image: C:\\Windows\\System32\\cmd.exe - CommandLine: whoami - User: SYSTEM - ProcessId: 1234 - condition: sel - """ - ) - ) - == [ - 'imProcessCreate\n| where TargetProcessName =~ "C:\\\\Windows\\\\System32\\\\cmd.exe" and TargetProcessCommandLine =~ "whoami" and TargetUsername =~ "SYSTEM" and TargetProcessId == 1234' - ] - ) - - -def test_sentinel_asim_network_connection_field_mapping(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Network Connection - status: test - logsource: - category: network_connection - product: windows - detection: - sel: - DestinationIp: 8.8.8.8 - DestinationPort: 53 - Protocol: udp - condition: sel - """ - ) - ) - == ['imNetworkSession\n| where DstIpAddr =~ "8.8.8.8" and DstPortNumber == 53 and NetworkProtocol =~ "udp"'] - ) - - -def test_sentinel_asim_registry_event_field_mapping(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Registry Event - status: test - logsource: - category: registry_event - product: windows - detection: - sel: - TargetObject: HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run - EventType: SetValue - condition: sel - """ - ) - ) - == [ - 'imRegistry\n| where RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SOFTWARE\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run" and EventType =~ "RegistryValueSet"' - ] - ) +from sigma.rule import SigmaRule + + +@pytest.fixture +def asim_backend(): + return KustoBackend(processing_pipeline=sentinel_asim_pipeline()) + + +def test_sentinel_asim_process_creation_field_mapping(asim_backend): + yaml_rule = """ + title: Test Process Creation + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + Image: C:\\Windows\\System32\\cmd.exe + CommandLine: whoami + User: SYSTEM + ProcessId: 1234 + condition: sel + """ + expected_result = [ + 'imProcessCreate\n| where TargetProcessName =~ "C:\\\\Windows\\\\System32\\\\cmd.exe" and TargetProcessCommandLine =~ "whoami" and TargetUsername =~ "SYSTEM" and TargetProcessId == 1234' + ] + + assert asim_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert asim_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_sentinel_asim_network_connection_field_mapping(asim_backend): + yaml_rule = """ + title: Test Network Connection + status: test + logsource: + category: network_connection + product: windows + detection: + sel: + DestinationIp: 8.8.8.8 + DestinationPort: 53 + Protocol: udp + condition: sel + """ + expected_result = [ + 'imNetworkSession\n| where DstIpAddr =~ "8.8.8.8" and DstPortNumber == 53 and NetworkProtocol =~ "udp"' + ] + + assert asim_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert asim_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_sentinel_asim_registry_event_field_mapping(asim_backend): + yaml_rule = """ + title: Test Registry Event + status: test + logsource: + category: registry_event + product: windows + detection: + sel: + TargetObject: HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run + EventType: SetValue + condition: sel + """ + expected_result = [ + 'imRegistry\n| where RegistryKey =~ "HKEY_LOCAL_MACHINE\\\\SOFTWARE\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Run" and EventType =~ "RegistryValueSet"' + ] + + assert asim_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert asim_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result def test_sentinel_asim_custom_table(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline(query_table="imFileEvent")).convert( - SigmaCollection.from_yaml( - """ - title: Test Custom Table - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Image: malware.exe - condition: sel - """ - ) - ) - == ['imFileEvent\n| where TargetFilePath =~ "malware.exe"'] - ) - - -def test_sentinel_asim_unsupported_field(): + yaml_rule = """ + title: Test Custom Table + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + Image: malware.exe + condition: sel + """ + expected_result = ['imFileEvent\n| where TargetFilePath =~ "malware.exe"'] + + custom_backend = KustoBackend(processing_pipeline=sentinel_asim_pipeline(query_table="imFileEvent")) + assert custom_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert custom_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_sentinel_asim_unsupported_field(asim_backend): + yaml_rule = """ + title: Test Unsupported Field + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + UnsupportedField: value + condition: sel + """ with pytest.raises( SigmaTransformationError, match="Invalid SigmaDetectionItem field name encountered: UnsupportedField" ): - KustoBackend(processing_pipeline=sentinel_asim_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test Unsupported Field - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - UnsupportedField: value - condition: sel - """ - ) - ) - - -def test_sentinel_asim_file_event(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test File Event - status: test - logsource: - category: file_event - product: windows - detection: - sel: - Image: C:\\Windows\\explorer.exe - condition: sel - """ - ) - ) - == ['imFileEvent\n| where TargetFilePath =~ "C:\\\\Windows\\\\explorer.exe"'] - ) + asim_backend.convert(SigmaCollection.from_yaml(yaml_rule)) + + +def test_sentinel_asim_file_event(asim_backend): + yaml_rule = """ + title: Test File Event + status: test + logsource: + category: file_event + product: windows + detection: + sel: + Image: C:\\Windows\\explorer.exe + condition: sel + """ + expected_result = ['imFileEvent\n| where TargetFilePath =~ "C:\\\\Windows\\\\explorer.exe"'] + + assert asim_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert asim_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result def test_sentinel_asim_pipeline_custom_table_invalid_category(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline(query_table="imFileEvent")).convert( - SigmaCollection.from_yaml( - """ - title: Test Custom Table - status: test - logsource: - category: blah - product: windows - detection: - sel: - Image: malware.exe - condition: sel - """ - ) - ) - == ['imFileEvent\n| where TargetFilePath =~ "malware.exe"'] - ) - - -def test_sentinel_asim_processcreate_hashes_field_values(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test ProcessCreate Hashes Field Values - status: test - logsource: - category: process_creation - product: windows - detection: - sel: - Hashes: - - md5=1234567890abcdef1234567890abcdef - - sha1=1234567890abcdef1234567890abcdef12345678 - - sha256=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef - - imphash=1234567890abcdef1234567890abcdef - - sha512=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef - condition: sel - """ - ) - ) - == [ - 'imProcessCreate\n| where TargetProcessMD5 =~ "1234567890abcdef1234567890abcdef" or TargetProcessSHA1 =~ "1234567890abcdef1234567890abcdef12345678" or TargetProcessSHA256 =~ "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef" or TargetProcessIMPHASH =~ "1234567890abcdef1234567890abcdef" or TargetProcessSHA512 =~ "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"' - ] - ) - -def test_sentinel_asim_fileevent_hashes_field_values(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test FileEvent Hashes Field Values - status: test - logsource: - category: file_event - product: windows - detection: - sel: - Hashes: - - md5=1234567890abcdef1234567890abcdef - - sha1=1234567890abcdef1234567890abcdef12345678 - - sha256=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef - condition: sel - """ - ) - ) - == ['imFileEvent\n| where TargetFileMD5 =~ "1234567890abcdef1234567890abcdef" or TargetFileSHA1 =~ "1234567890abcdef1234567890abcdef12345678" or TargetFileSHA256 =~ "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"'] - ) - -def test_sentinel_asim_webrequest_hashes_field_values(): - assert ( - KustoBackend(processing_pipeline=sentinel_asim_pipeline()).convert( - SigmaCollection.from_yaml( - """ - title: Test WebRequest Hashes Field Values - status: test - logsource: - category: proxy - product: windows - detection: - sel: - Hashes: - - md5=1234567890abcdef1234567890abcdef - - sha1=1234567890abcdef1234567890abcdef12345678 - - sha256=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef - condition: sel - """ - ) - ) - == ['imWebSession\n| where FileMD5 =~ "1234567890abcdef1234567890abcdef" or FileSHA1 =~ "1234567890abcdef1234567890abcdef12345678" or FileSHA256 =~ "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"'] - ) + yaml_rule = """ + title: Test Custom Table + status: test + logsource: + category: blah + product: windows + detection: + sel: + Image: malware.exe + condition: sel + """ + expected_result = ['imFileEvent\n| where TargetFilePath =~ "malware.exe"'] + + custom_backend = KustoBackend(processing_pipeline=sentinel_asim_pipeline(query_table="imFileEvent")) + assert custom_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert custom_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_sentinel_asim_processcreate_hashes_field_values(asim_backend): + yaml_rule = """ + title: Test ProcessCreate Hashes Field Values + status: test + logsource: + category: process_creation + product: windows + detection: + sel: + Hashes: + - md5=1234567890abcdef1234567890abcdef + - sha1=1234567890abcdef1234567890abcdef12345678 + - sha256=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef + - imphash=1234567890abcdef1234567890abcdef + - sha512=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef + condition: sel + """ + expected_result = [ + 'imProcessCreate\n| where TargetProcessMD5 =~ "1234567890abcdef1234567890abcdef" or TargetProcessSHA1 =~ "1234567890abcdef1234567890abcdef12345678" or TargetProcessSHA256 =~ "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef" or TargetProcessIMPHASH =~ "1234567890abcdef1234567890abcdef" or TargetProcessSHA512 =~ "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"' + ] + + assert asim_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert asim_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_sentinel_asim_fileevent_hashes_field_values(asim_backend): + yaml_rule = """ + title: Test FileEvent Hashes Field Values + status: test + logsource: + category: file_event + product: windows + detection: + sel: + Hashes: + - md5=1234567890abcdef1234567890abcdef + - sha1=1234567890abcdef1234567890abcdef12345678 + - sha256=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef + condition: sel + """ + expected_result = [ + 'imFileEvent\n| where TargetFileMD5 =~ "1234567890abcdef1234567890abcdef" or TargetFileSHA1 =~ "1234567890abcdef1234567890abcdef12345678" or TargetFileSHA256 =~ "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"' + ] + + assert asim_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert asim_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result + + +def test_sentinel_asim_webrequest_hashes_field_values(asim_backend): + yaml_rule = """ + title: Test WebRequest Hashes Field Values + status: test + logsource: + category: proxy + product: windows + detection: + sel: + Hashes: + - md5=1234567890abcdef1234567890abcdef + - sha1=1234567890abcdef1234567890abcdef12345678 + - sha256=1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef + condition: sel + """ + expected_result = [ + 'imWebSession\n| where FileMD5 =~ "1234567890abcdef1234567890abcdef" or FileSHA1 =~ "1234567890abcdef1234567890abcdef12345678" or FileSHA256 =~ "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"' + ] + + assert asim_backend.convert(SigmaCollection.from_yaml(yaml_rule)) == expected_result + assert asim_backend.convert_rule(SigmaRule.from_yaml(yaml_rule)) == expected_result