diff --git a/sigma/plugins.py b/sigma/plugins.py index 0349971..ffc6bb4 100644 --- a/sigma/plugins.py +++ b/sigma/plugins.py @@ -5,6 +5,7 @@ import importlib.metadata import inspect import pkgutil +import re import subprocess import sys from typing import Callable, Dict, Any, List, Optional, Set, Union, get_type_hints @@ -55,6 +56,7 @@ def _discover_module_directories( result = dict() def is_pipeline(obj): + """Checks if an object is a pipeline.""" return any( [ inspect.isclass(obj) and issubclass(obj, Pipeline), @@ -64,6 +66,20 @@ def is_pipeline(obj): ] ) + def is_validator(obj): + """Checks if an object is a validator.""" + return all( + [ + inspect.isclass(obj), + issubclass(obj, SigmaRuleValidator), + obj.__module__ != "sigma.validators.base", + ] + ) + + def is_backend(obj): + """Checks if an object is a backend.""" + return inspect.isclass(obj) and issubclass(obj, Backend) + if include: for mod in pkgutil.iter_modules(module.__path__, module.__name__ + "."): # attempt to merge backend directory from module into collected backend directory @@ -125,25 +141,28 @@ def is_pipeline(obj): if is_pipeline(possible_obj) or inspect.isfunction( possible_obj ): - if inspect.isclass(possible_obj) and issubclass(possible_obj, Pipeline): + if inspect.isclass(possible_obj) and issubclass( + possible_obj, Pipeline + ): result[obj_name] = possible_obj() else: result[obj_name] = possible_obj elif directory_name == "validators": for cls_name in submodules: - if ( - inspect.isclass(submodules[cls_name]) - and issubclass(submodules[cls_name], SigmaRuleValidator) - and submodules[cls_name].__module__ - != "sigma.validators.base" - ): + if is_validator(submodules[cls_name]): result[cls_name] = submodules[cls_name] elif directory_name == "backends": # Backends reside on the module level for cls_name in imported_module.__dict__: klass = getattr(imported_module, cls_name) - if inspect.isclass(klass) and issubclass(klass, Backend): - result.update({cls_name: klass}) + if is_backend(klass): + result.update( + { + InstalledSigmaPlugins._get_backend_name( + klass, cls_name + ): klass + } + ) else: raise ValueError( f"Unknown directory name {directory_name} for module {mod.name}" @@ -183,6 +202,43 @@ def get_pipeline_resolver(self) -> ProcessingPipelineResolver: } ) + @staticmethod + def _get_backend_name(obj: Any, default: str) -> Optional[str]: + """ + Get the name of a backend object. This is either the name attribute of the object, + the __name__ attribute of the object, or the __class__ attribute of the object. + The name is then converted to snake_case. If the name is empty, the default is returned. + + Args: + obj: The Backend object to get the name from. + default: The default name to return if no name could be found. + + Returns: + The name of the backend object in snake_case or the default name. + """ + try: + name = getattr(obj, "name", None) + if name: + return name + + name = ( + getattr(obj, "__name__", None) or getattr(obj, "__class__", None) + if name is None + else name + ) + + if name: + name = name.removesuffix("Backend") + words = re.findall(r"[A-Z](?:[A-Z]*(?![a-z])|[a-z]*)", name) + if len(words) == 0: + return name.lower() + rebuilt_name = "_".join(words).lower() + return rebuilt_name + else: + return default + except: + return default + class SigmaPluginType(EnumLowercaseStringMixin, Enum): BACKEND = auto()