Skip to content

Commit

Permalink
Get backend name conditionally plus slight refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mostafa committed Jul 5, 2023
1 parent 85279fd commit 4226f3e
Showing 1 changed file with 65 additions and 9 deletions.
74 changes: 65 additions & 9 deletions sigma/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import importlib.metadata
import inspect
import pkgutil
import re
import subprocess
import sys
from typing import Callable, Dict, Any, List, Optional, Set, Union, get_type_hints
Expand Down Expand Up @@ -55,6 +56,7 @@ def _discover_module_directories(
result = dict()

def is_pipeline(obj):
"""Checks if an object is a pipeline."""
return any(
[
inspect.isclass(obj) and issubclass(obj, Pipeline),
Expand All @@ -64,6 +66,20 @@ def is_pipeline(obj):
]
)

def is_validator(obj):
"""Checks if an object is a validator."""
return all(
[
inspect.isclass(obj),
issubclass(obj, SigmaRuleValidator),
obj.__module__ != "sigma.validators.base",
]
)

def is_backend(obj):
"""Checks if an object is a backend."""
return inspect.isclass(obj) and issubclass(obj, Backend)

if include:
for mod in pkgutil.iter_modules(module.__path__, module.__name__ + "."):
# attempt to merge backend directory from module into collected backend directory
Expand Down Expand Up @@ -125,25 +141,28 @@ def is_pipeline(obj):
if is_pipeline(possible_obj) or inspect.isfunction(
possible_obj
):
if inspect.isclass(possible_obj) and issubclass(possible_obj, Pipeline):
if inspect.isclass(possible_obj) and issubclass(
possible_obj, Pipeline
):
result[obj_name] = possible_obj()
else:
result[obj_name] = possible_obj
elif directory_name == "validators":
for cls_name in submodules:
if (
inspect.isclass(submodules[cls_name])
and issubclass(submodules[cls_name], SigmaRuleValidator)
and submodules[cls_name].__module__
!= "sigma.validators.base"
):
if is_validator(submodules[cls_name]):
result[cls_name] = submodules[cls_name]
elif directory_name == "backends":
# Backends reside on the module level
for cls_name in imported_module.__dict__:
klass = getattr(imported_module, cls_name)
if inspect.isclass(klass) and issubclass(klass, Backend):
result.update({cls_name: klass})
if is_backend(klass):
result.update(
{
InstalledSigmaPlugins._get_backend_name(
klass, cls_name
): klass
}
)
else:
raise ValueError(
f"Unknown directory name {directory_name} for module {mod.name}"
Expand Down Expand Up @@ -183,6 +202,43 @@ def get_pipeline_resolver(self) -> ProcessingPipelineResolver:
}
)

@staticmethod
def _get_backend_name(obj: Any, default: str) -> Optional[str]:
"""
Get the name of a backend object. This is either the name attribute of the object,
the __name__ attribute of the object, or the __class__ attribute of the object.
The name is then converted to snake_case. If the name is empty, the default is returned.
Args:
obj: The Backend object to get the name from.
default: The default name to return if no name could be found.
Returns:
The name of the backend object in snake_case or the default name.
"""
try:
name = getattr(obj, "name", None)
if name:
return name

name = (
getattr(obj, "__name__", None) or getattr(obj, "__class__", None)
if name is None
else name
)

if name:
name = name.removesuffix("Backend")
words = re.findall(r"[A-Z](?:[A-Z]*(?![a-z])|[a-z]*)", name)
if len(words) == 0:
return name.lower()
rebuilt_name = "_".join(words).lower()
return rebuilt_name
else:
return default
except:
return default


class SigmaPluginType(EnumLowercaseStringMixin, Enum):
BACKEND = auto()
Expand Down

0 comments on commit 4226f3e

Please sign in to comment.