diff --git a/deployability/deps/remote_requirements.txt b/deployability/deps/remote_requirements.txt new file mode 100755 index 0000000000..d9312a37b0 --- /dev/null +++ b/deployability/deps/remote_requirements.txt @@ -0,0 +1,4 @@ +pytest>=7.4.2,<8.0.0 +chardet==5.2.0 +chardet==5.2.0 +pytest-tinybird==0.2.0 diff --git a/deployability/deps/requirements.txt b/deployability/deps/requirements.txt new file mode 100755 index 0000000000..a93a63576e --- /dev/null +++ b/deployability/deps/requirements.txt @@ -0,0 +1,10 @@ +ansible_runner==2.3.4 +boto3==1.29.1 +pydantic==2.5.2 +ansible +ruamel.yaml==0.18.5 +ruamel.yaml.clib==0.2.8 +graphlib==0.9.5 +jsonschema==3.2.0 +PyYAML==6.0.1 +colorlog==6.8.0 diff --git a/deployability/modules/setup.py b/deployability/modules/setup.py new file mode 100644 index 0000000000..a3bcf42aaf --- /dev/null +++ b/deployability/modules/setup.py @@ -0,0 +1,47 @@ + +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is free software; you can redistribute it and/or modify it under the terms of GPLv2 +import json +from setuptools import setup, find_packages +import os +from pathlib import Path + +def get_files_from_directory(directory): + paths = [] + base_path = Path(__file__) + for (path, directories, filenames) in os.walk(directory): + for filename in filenames: + if filename.endswith(('.yaml', '.json', '.md', '.py')): + paths.append(os.path.join(base_path, path, filename)) + return paths + +def get_version(): + abs_path = Path(__file__).parent.parent / "version.json" + + if not os.path.exists(abs_path): + raise FileNotFoundError(f'File "{abs_path}" not found.') + + with open(abs_path, 'r') as abs_file: + data = json.load(abs_file) + version = data['version'] + return version or None + +package_data_list = get_files_from_directory("workflow_engine") +scripts_list = ['engine=workflow_engine.__main__:main'] + +setup( + name='workflow_engine', + version=get_version(), + description='Wazuh testing utilities to help programmers automate deployment tests', + url='https://github.com/wazuh', + author='Wazuh', + author_email='hello@wazuh.com', + license='GPLv2', + packages=['workflow_engine'], + package_dir={'workflow_engine': 'workflow_engine'}, + package_data={'workflow_engine': package_data_list}, + entry_points={'console_scripts': scripts_list}, + include_package_data=True, + zip_safe=False +) \ No newline at end of file diff --git a/deployability/modules/workflow_engine/README.md b/deployability/modules/workflow_engine/README.md new file mode 100755 index 0000000000..c6450d8fc5 --- /dev/null +++ b/deployability/modules/workflow_engine/README.md @@ -0,0 +1,325 @@ +## Workflow engine + +### User documentation + +The execution of the Workflow is done through the installation of its library. + +Initially, Python libraries must be installed. It is recommended to use virtual environments. Follow the technical documentation at https://docs.python.org/3/library/venv.html. + +1. Activate the environment: + + ```bash + source {venv directory}/bin/activate + ``` + +2. Clone the `wazuh-qa` repository: + + ```bash + git clone {wazuh-qa} + ``` + + Navigate to the project directory and switch to the project branch: + + ```bash + cd wazuh-qa + git checkout {project-branch} + ``` + +3. Install requirements: + + ```bash + pip3 install -r deployability/deps/requirements.txt + ``` + +4. Install the Workflow engine library and its launcher: + + While in wazuh-qa: + + ```bash + cd modules + pip3 uninstall -y workflow_engine && pip3 install . + ``` + +5. Test Fixture to Execute: + + It will be necessary to create a fixture (yaml file) where the infrastructure, provisioning, and tests to be executed will be declared. + + >Note: It is possible to find some fixture examples in deployability/modules/workflow_engine/examples/ + + Example: + + ```bash + version: 0.1 + description: This workflow is used to test agents deployment por DDT1 PoC + variables: + agents-os: + - linux-ubuntu-22.04-amd64 + manager-os: linux-ubuntu-22.04-amd64 + infra-provider: vagrant + working-dir: /tmp/dtt1-poc + + tasks: + # Generic agent test task + - task: "run-agent-tests-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - agent: "{working-dir}/agent-{agent}/inventory.yaml" + - tests: "install,register,stop" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "provision-install-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent test task + - task: "run-agent-tests-uninstall-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - tests: "uninstall" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "run-agent-tests-{agent}" + - "provision-uninstall-{agent}" + foreach: + - variable: agents-os + as: agent + + # Unique manager provision task + - task: "provision-manager" + description: "Provision the manager." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-manager + type: package + depends-on: + - "allocate-manager" + + # Unique manager allocate task + - task: "allocate-manager" + description: "Allocate resources for the manager." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: large + - composite-name: "{manager-os}" + - inventory-output: "{working-dir}/manager-{manager-os}/inventory.yaml" + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + + # Generic agent provision task + - task: "provision-install-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-agent + type: package + - component: curl + depends-on: + - "allocate-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent provision task + - task: "provision-uninstall-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - uninstall: + - component: wazuh-agent + type: package + depends-on: + - "provision-install-{agent}" + foreach: + - variable: agents-os + as: agent + + # Generic agent allocate task + - task: "allocate-{agent}" + description: "Allocate resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: small + - composite-name: "{agent}" + - inventory-output: "{working-dir}/agent-{agent}/inventory.yaml" + - track-output: "{working-dir}/agent-{agent}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/agent-{agent}/track.yaml" + foreach: + - variable: agents-os + as: agent + ``` + + Following the schema of the example: + + Configure the following parameters depending on your test case: + + ```yaml + variables/agent-os + variables/manager-os + infra-provider + working-dir + tasks + ``` + + Pay attention to the tasks: + + ```yaml + args + depends-on + ``` + + >Note: In args, configure the launcher's path correctly (main.py files in each module), and to fill `depends-on`, consider the steps of your test (allocation, provision, and test) + +7. Execution of Command (local): + + Execute the command by referencing the parameters required by the library (launcher). + + ```bash + python3 -m workflow_engine {.yaml fixture path} + ``` + + Example + + ```bash + python3 -m workflow_engine modules/workflow_engine/examples/dtt1-agents-poc.yaml + ``` + + > Note The command execution can also be mediated through Jenkins. + +--- + +### Technical documentation + +`Workflow Engine` is the orchestrator of the deployability test architecture. + +Its function is to allow the ordered and structured execution in steps of allocation, provision, and testing. + +`The Workflow Engine` receives instructions through a `YAML document`, the structure of which can be exemplified in tests found in: +`wazuh-qa/deployability/modules/workflow_engine/examples` + +**In these tests**: + - Tasks: define the steps. + - Task: defines a step. + +**Within Task**: + - description: description of the task. + - do: instructions for the task. + - this: nature of the task. + - with: tools with which the task will be executed. + - path: executable. + - args: arguments. it receives the binary or file to execute and the parameters. + - depends-on: steps prior to the execution of that task. + - foreach: loop that executes the task on the previously declared hosts. + +```bash +tasks: + # Generic agent test task + - task: "run-agent-tests-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - agent: "{working-dir}/agent-{agent}/inventory.yaml" + - tests: "install,register,stop" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "provision-install-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent +``` + +These tasks are executed by the `Workflow Engine` launcher installed as workflow_engine library in your virtual environment. + +This launcher receives the parameters, sets up the test logs, and proceeds with the ordered execution. + +The parameters sent from the launcher are processed by deployability/modules/workflow_engine/models.py, which checks the nature of the parameters sent and filters out incorrect parameters. + +![image](https://github.com/wazuh/wazuh-qa/assets/125690423/32aa77b7-f294-41ac-af93-db8a084dbad1) + +These are then sent to `deployability/modules/workflow_engine/workflow_processor.py`, where using `deployability/modules/schemas`, instructions in YAML are received and the schema of the instructions is checked. + +The commands are executed in the WorkflowProcessor of the same file, which also handles parallel executions and aborts failed executions. + +[WF.drawio.zip](https://github.com/wazuh/wazuh-qa/files/14167559/WF.drawio.zip) + + +### License + +WAZUH Copyright (C) 2015 Wazuh Inc. (License GPLv2) diff --git a/deployability/modules/workflow_engine/__init__.py b/deployability/modules/workflow_engine/__init__.py new file mode 100755 index 0000000000..338f58e9fd --- /dev/null +++ b/deployability/modules/workflow_engine/__init__.py @@ -0,0 +1 @@ +from .workflow_processor import WorkflowProcessor diff --git a/deployability/modules/workflow_engine/__main__.py b/deployability/modules/workflow_engine/__main__.py new file mode 100755 index 0000000000..5d9fe0a0c4 --- /dev/null +++ b/deployability/modules/workflow_engine/__main__.py @@ -0,0 +1,39 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 + +import os +import sys +import argparse +import signal + +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')) +sys.path.append(project_root) + +from workflow_engine.workflow_processor import WorkflowProcessor +from workflow_engine.models import InputPayload + + +def parse_arguments() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description='Execute tasks in a workflow.') + parser.add_argument('workflow_file', type=str,help='Path to the workflow file (YAML format).') + parser.add_argument('--threads', type=int, default=1, required=False, help='Number of threads to use for parallel execution.') + parser.add_argument('--dry-run', action='store_true', required=False, help='Display the plan without executing tasks.') + parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', + help='Log level.') + parser.add_argument('--schema_file', required=False, type=str, help='Path to the schema file (YAML format)') + return parser.parse_args() + +def main() -> None: + """Main entry point.""" + try: + args = parse_arguments() + processor = WorkflowProcessor(**dict(InputPayload(**vars(args)))) + signal.signal(signal.SIGINT, processor.handle_interrupt) + processor.run() + except Exception as e: + sys.exit(f"Error while provisioning: {e}") + +if __name__ == "__main__": + main() diff --git a/deployability/modules/workflow_engine/examples/dtt1-agents-2.yaml b/deployability/modules/workflow_engine/examples/dtt1-agents-2.yaml new file mode 100755 index 0000000000..fa979a6f81 --- /dev/null +++ b/deployability/modules/workflow_engine/examples/dtt1-agents-2.yaml @@ -0,0 +1,170 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +version: 0.1 +description: This workflow is used to test agents deployment with a single manager. +variables: + agents-os: + - linux-ubuntu-22.04-amd64 + manager-os: linux-ubuntu-22.04-amd64 + infra-provider: vagrant + working-dir: /tmp/dtt1 + +tasks: + # Generic agent test task + - task: "run-agent-tests-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - agent: "{working-dir}/agent-{agent}/inventory.yaml" + - tests: "install,register,stop" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "provision-install-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent test task + - task: "run-agent-tests-uninstall-{agent}" + description: "Run tests uninstall for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/testing/main.py + - inventory: "{working-dir}/agent-{agent}/inventory.yaml" + - dependencies: + - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - tests: "uninstall" + - component: "agent" + - wazuh-version: "4.7.1" + - wazuh-revision: "40709" + depends-on: + - "run-agent-tests-{agent}" + - "provision-uninstall-{agent}" + foreach: + - variable: agents-os + as: agent + + # Unique manager provision task + - task: "provision-manager" + description: "Provision the manager." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-manager + type: aio + version: "4.7.0" + depends-on: + - "allocate-manager" + + # Unique manager allocate task + - task: "allocate-manager" + description: "Allocate resources for the manager." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: large + - composite-name: "{manager-os}" + - inventory-output: "{working-dir}/manager-{manager-os}/inventory.yaml" + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/manager-{manager-os}/track.yaml" + + # Generic agent provision task + - task: "provision-install-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - install: + - component: wazuh-agent + type: aio + version: "4.8.0" + live: False + depends-on: + - "allocate-{agent}" + - "provision-manager" + foreach: + - variable: agents-os + as: agent + + # Generic agent provision task + - task: "provision-uninstall-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/provision/main.py + - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" + - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" + - uninstall: + - component: wazuh-agent + type: package + depends-on: + - "provision-install-{agent}" + foreach: + - variable: agents-os + as: agent + + # Generic agent allocate task + - task: "allocate-{agent}" + description: "Allocate resources for the {agent} agent." + do: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: create + - provider: "{infra-provider}" + - size: small + - composite-name: "{agent}" + - inventory-output: "{working-dir}/agent-{agent}/inventory.yaml" + - track-output: "{working-dir}/agent-{agent}/track.yaml" + cleanup: + this: process + with: + path: python3 + args: + - modules/allocation/main.py + - action: delete + - track-output: "{working-dir}/agent-{agent}/track.yaml" + foreach: + - variable: agents-os + as: agent \ No newline at end of file diff --git a/deployability/modules/workflow_engine/examples/dtt1-agents.yaml b/deployability/modules/workflow_engine/examples/dtt1-agents.yaml new file mode 100755 index 0000000000..e8a827282d --- /dev/null +++ b/deployability/modules/workflow_engine/examples/dtt1-agents.yaml @@ -0,0 +1,120 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +version: 0.1 +description: This workflow is used to test agents deployment. +variables: + agents-os: + - linux-redhat-7-amd64 + - linux-redhat-8-amd64 + - linux-redhat-9-amd64 + - linux-centos-7-amd64 + - linux-centos-8-amd64 + - linux-debian-10-amd64 + - linux-debian-11-amd64 + - linux-debian-12-amd64 + - linux-ubuntu-18.04-amd64 + - linux-ubuntu-20.04-amd64 + - linux-ubuntu-22.04-amd64 + - linux-fedora-37-amd64 + - linux-fedora-38-amd64 + - linux-suse-15-amd64 + - linux-opensuse-15-amd64 + - linux-oracle-9-amd64 + - linux-amazon-2-amd64 + - linux-amazon-2023-amd64 + - windows-10-amd64 + - windows-11-amd64 + - windows-server2012-amd64 + - windows-server2016-amd64 + - windows-server2019-amd64 + - windows-server2022-amd64 + - macos-13.3-amd64 + - macos-14.2-amd64 + manager-os: linux-amazon-2023-amd64 + +tasks: + # Generic agent test task + - task: "test-agent-{agent}" + description: "Run tests for the {agent} agent." + do: + this: process + with: + path: /bin/echo + args: + - -n + - "Running tests for {agent}" + depends-on: + - "provision-agent-{agent}" + foreach: + - variable: agents-os + as: agent + + # Unique manager provision task + - task: "provision-manager-{manager-os}" + description: "Provision the manager." + do: + this: process + with: + path: /bin/echo + args: + - -n + - "Running provision for manager" + depends-on: + - "allocate-manager-{manager-os}" + + # Unique manager allocate task + - task: "allocate-manager-{manager-os}" + description: "Allocate resources for the manager." + do: + this: process + with: + path: /bin/echo + args: + - -n + - "Running allocate for manager" + cleanup: + this: process + with: + path: /bin/echo + args: + - -n + - "Running cleanup for manager" + + # Generic agent provision task + - task: "provision-agent-{agent}" + description: "Provision resources for the {agent} agent." + do: + this: process + with: + path: /bin/echo + args: + - -n + - "Running provision for {agent}" + depends-on: + - "allocate-agent-{agent}" + - "provision-manager-{manager-os}" + foreach: + - variable: agents-os + as: agent + + # Generic agent allocate task + - task: "allocate-agent-{agent}" + description: "Allocate resources for the {agent} agent." + do: + this: process + with: + path: /bin/echo + args: + - -n + - "Running allocate for {agent}" + cleanup: + this: process + with: + path: /bin/echo + args: + - -n + - "Running cleanup for allocate for {agent}" + foreach: + - variable: agents-os + as: agent \ No newline at end of file diff --git a/deployability/modules/workflow_engine/examples/dtt1-managers.yaml b/deployability/modules/workflow_engine/examples/dtt1-managers.yaml new file mode 100755 index 0000000000..503cd115a3 --- /dev/null +++ b/deployability/modules/workflow_engine/examples/dtt1-managers.yaml @@ -0,0 +1,103 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 +version: 0.1 +description: This workflow is used to test managers deployment. Two agents per manager are deployed. +variables: + agents-os: + - linux-debian-12-amd64 + - linux-ubuntu-22.04-amd64 + managers-os: + - linux-redhat-7-amd64 + - linux-redhat-8-amd64 + - linux-redhat-9-amd64 + - linux-centos-7-amd64 + - linux-centos-8-amd64 + - linux-debian-10-amd64 + - linux-debian-11-amd64 + - linux-debian-12-amd64 + - linux-ubuntu-18.04-amd64 + - linux-ubuntu-20.04-amd64 + - linux-ubuntu-22.04-amd64 + - linux-fedora-37-amd64 + - linux-fedora-38-amd64 + - linux-suse-15-amd64 + - linux-opensuse-15-amd64 + - linux-oracle-9-amd64 + - linux-amazon-2-amd64 + - linux-amazon-2023-amd64 +tasks: + # Generic manager test task + - task: "test-{manager}-{agent}" + do: + this: process + with: + path: /bin/echo + args: + - Executing tests for {manager} manager with {agent} agent. + depends-on: + - "provision-{manager}-manager" + - "provision-{agent}-agent-for-{manager}-manager" + foreach: + - variable: managers-os + as: manager + - variable: agents-os + as: agent + + # --------- Provision -------------- + # Generic manager provision task + - task: "provision-{manager}-manager" + do: + this: process + with: + path: /bin/echo + args: + - Executing provision for {manager} as a manager. + depends-on: + - "allocate-{manager}-manager" + foreach: + - variable: managers-os + as: manager + + # Generic agent provision task + - task: "provision-{agent}-agent-for-{manager}-manager" + do: + this: process + with: + path: /bin/echo + args: + - Executing provision for {agent} as an agent. + depends-on: + - "allocate-{agent}-agent-for-{manager}-manager" + foreach: + - variable: managers-os + as: manager + - variable: agents-os + as: agent + + # --------- Allocate -------------- + # Generic manager allocate task + - task: "allocate-{manager}-manager" + do: + this: process + with: + path: /bin/echo + args: + - Executing allocation for {manager} as a manager. + foreach: + - variable: managers-os + as: manager + + # Generic agent allocate task + - task: "allocate-{agent}-agent-for-{manager}-manager" + do: + this: process + with: + path: /bin/echo + args: + - Executing allocation for {agent} as an agent. + foreach: + - variable: managers-os + as: manager + - variable: agents-os + as: agent diff --git a/deployability/modules/workflow_engine/logger/__init__.py b/deployability/modules/workflow_engine/logger/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/deployability/modules/workflow_engine/logger/config.yaml b/deployability/modules/workflow_engine/logger/config.yaml new file mode 100644 index 0000000000..764f4cc7a3 --- /dev/null +++ b/deployability/modules/workflow_engine/logger/config.yaml @@ -0,0 +1,28 @@ +version: 1 +formatters: + simple: + format: '[%(asctime)s] [%(levelname)s] [%(process)d] [%(threadName)s] [%(name)s]: %(message)s' + colored: + (): colorlog.ColoredFormatter + format: '%(log_color)s[%(asctime)s] [%(levelname)s] [%(process)d] [%(threadName)s] [%(name)s]: %(message)s' + datefmt: '%Y-%m-%d %H:%M:%S' + log_colors: + DEBUG: cyan + INFO: green + WARNING: yellow + ERROR: red + CRITICAL: red,bg_white +handlers: + console: + class: colorlog.StreamHandler + level: DEBUG + formatter: colored + stream: ext://sys.stdout + file: + class: logging.FileHandler + level: DEBUG + formatter: simple + filename: /tmp/flowbacca.log +root: + level: DEBUG + handlers: [console, file] diff --git a/deployability/modules/workflow_engine/logger/logger.py b/deployability/modules/workflow_engine/logger/logger.py new file mode 100644 index 0000000000..e090ff0c57 --- /dev/null +++ b/deployability/modules/workflow_engine/logger/logger.py @@ -0,0 +1,26 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 + +import logging +import logging.config +from pathlib import Path +import threading +import os + +import yaml + +def _load_config() -> None: + """ + Loads the logging configuration from 'config.yaml' file. + """ + config_path = Path(__file__).parent / 'config.yaml' + + if os.path.exists(config_path): + with open(config_path, 'r') as f: + config = yaml.safe_load(f.read()) + logging.config.dictConfig(config) + +_load_config() + +logger = logging.getLogger("workflow_engine") diff --git a/deployability/modules/workflow_engine/models.py b/deployability/modules/workflow_engine/models.py new file mode 100644 index 0000000000..c92d2a868f --- /dev/null +++ b/deployability/modules/workflow_engine/models.py @@ -0,0 +1,15 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 + +from pathlib import Path +from typing import Literal +from pydantic import BaseModel + + +class InputPayload(BaseModel): + workflow_file: str | Path + threads: int = 1 + dry_run: bool = False + log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO' + schema_file: str | Path | None = None diff --git a/deployability/modules/workflow_engine/schema_validator.py b/deployability/modules/workflow_engine/schema_validator.py new file mode 100755 index 0000000000..d04a9bb39d --- /dev/null +++ b/deployability/modules/workflow_engine/schema_validator.py @@ -0,0 +1,90 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 + +import jsonschema +import json +import os + +from jsonschema.exceptions import ValidationError +from pathlib import Path +from ruamel.yaml import YAML + +from workflow_engine.logger.logger import logger + +class SchemaValidator: + """ + A SchemaValidator class that validates a YAML file against a JSON schema. + + Attributes: + schema_data (dict): The schema data. + yaml_data (dict): The YAML data. + """ + + def __init__(self, schema: Path | str, to_validate: Path | str): + """ + Initializes the SchemaValidator object. + + Args: + schema (Path, str): The path to the schema file. + to_validate (Path, str): The path to the YAML file to validate. + """ + schema_data: str = None + yaml_data: str = None + + self.logger = logger + + if not os.path.exists(schema): + raise FileNotFoundError(f'File "{schema}" not found.') + + with open(schema, 'r') as schema_file: + self.logger.debug(f"Loading schema file: {schema}") + schema_data = json.load(schema_file) + + if not os.path.exists(to_validate): + raise FileNotFoundError(f'File "{to_validate}" not found.') + + with open(to_validate, 'r') as file: + self.logger.debug(f"Loading yaml file: {to_validate}") + yaml = YAML(typ='safe', pure=True) + yaml_data = yaml.load(file) + + self.schema_data = schema_data + self.yaml_data = yaml_data + + def preprocess_data(self) -> None: + """ + Preprocess the YAML data to be validated. + + Raises: + ValidationError: If the YAML data is not valid. + """ + for task in self.yaml_data.get('tasks', []): + do_with = task.get('do', {}).get('with', {}) + this_value = task.get('do', {}).get('this', '') + + if this_value == 'process': + if 'path' not in do_with or 'args' not in do_with: + raise ValidationError(f"Missing required properties in 'with' for task: {task}") + + do_with = task.get('cleanup', {}).get('with', {}) + this_value = task.get('cleanup', {}).get('this', '') + + if this_value == 'process': + if 'path' not in do_with or 'args' not in do_with: + raise ValidationError(f"Missing required properties in 'with' for task: {task}") + + def validateSchema(self) -> None: + """ + Validate the Workflow schema + + Raises: + ValidationError: If the YAML data is not valid. + Exception: If an unexpected error occurs. + """ + try: + jsonschema.validate(self.yaml_data, self.schema_data) + except ValidationError as e: + self.logger.error(f"Schema validation error: {e}") + except Exception as e: + self.logger.error(f"Unexpected error at schema validation: {e}") diff --git a/deployability/modules/workflow_engine/schemas/schema_v1.json b/deployability/modules/workflow_engine/schemas/schema_v1.json new file mode 100644 index 0000000000..81f8e7c587 --- /dev/null +++ b/deployability/modules/workflow_engine/schemas/schema_v1.json @@ -0,0 +1,118 @@ +{ + "type": "object", + "properties": { + "name": {"type": "string"}, + "description": {"type": "string"}, + "version": {"type": "number"}, + "tasks": { + "type": "array", + "items": { + "type": "object", + "properties": { + "task": {"type": "string"}, + "do": { + "type": "object", + "properties": { + "this": {"type": "string"}, + "with": { + "type": "object", + "properties": { + "this": {"type": "string"}, + "args": { + "type": "array", + "items": { + "oneOf": [ + {"type": "string"}, + {"type": "array"}, + {"type": "object"} + ] + } + }, + "path": {"type": "string"} + } + } + }, + "required": ["this"] + }, + "cleanup": { + "type": "object", + "properties": { + "this": {"type": "string"}, + "with": { + "type": "object", + "properties": { + "this": {"type": "string"}, + "args": { + "type": "array", + "items": { + "oneOf": [ + {"type": "string"}, + {"type": "array"}, + {"type": "object"} + ] + } + }, + "path": {"type": "string"} + } + } + }, + "required": ["this"] + }, + "depends-on": { + "type": "array", + "items": {"type": "string"} + }, + "foreach": { + "type": "array", + "items": { + "type": "object", + "properties": { + "variable": {"type": "string"}, + "as": {"type": "string"}, + "foreach": { + "type": "array", + "items": { + "type": "object", + "properties": { + "variable": {"type": "string"}, + "as": {"type": "string"} + } + } + } + }, + "required": ["variable", "as"] + } + } + }, + "required": ["task", "do"] + + }, + "minItems": 1 + }, + "variables": { + "type": "object", + "properties": { + "agent-os": { + "type": "array", + "items": { + "oneOf": [ + {"type": "string"}, + {"type": "array"} + ] + } + }, + "managers-os": { + "type": "array", + "items": { + "oneOf": [ + {"type": "string"}, + {"type": "array"} + ] + } + } + } + } + }, + "required": ["tasks", "variables", "version"], + "additionalProperties": false + } \ No newline at end of file diff --git a/deployability/modules/workflow_engine/task.py b/deployability/modules/workflow_engine/task.py new file mode 100755 index 0000000000..b9e349a74b --- /dev/null +++ b/deployability/modules/workflow_engine/task.py @@ -0,0 +1,102 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 + +import subprocess +import random +import time + +from abc import ABC, abstractmethod +from workflow_engine.logger.logger import logger + +class Task(ABC): + """Abstract base class for tasks.""" + + @abstractmethod + def execute(self) -> None: + """Execute the task.""" + pass + +class ProcessTask(Task): + """Task for executing a process.""" + + def __init__(self, task_name: str, task_parameters: dict): + """ + Initialize ProcessTask. + + Args: + task_name (str): Name of the task. + task_parameters (dict): Parameters for the task. + logger (logging.Logger): Logger instance. + """ + self.task_name = task_name + self.task_parameters = task_parameters + self.logger = logger + + def execute(self) -> None: + """Execute the process task.""" + + task_args = [] + if self.task_parameters.get('args') is None: + raise ValueError(f'Not argument found in {self.task_name}') + + for arg in self.task_parameters['args']: + if isinstance(arg, str): + task_args.append(arg) + elif isinstance(arg, dict): + key, value = list(arg.items())[0] + if isinstance(value, list): + task_args.extend([f"--{key}={argvalue}" for argvalue in value]) + else: + task_args.append(f"--{key}={value}") + else: + logger.error(f'Could not parse arguments {arg}') + + logger.debug(f'Running task "{self.task_name}" with arguments: {task_args}') + + result = None + try: + result = subprocess.run( + [self.task_parameters['path']] + task_args, + check=True, + capture_output=True, + text=True, + ) + logger.debug(f'Finished task "{self.task_name}" execution with result:\n{str(result.stdout)}') + + if result.returncode != 0: + raise subprocess.CalledProcessError(returncode=result.returncode, cmd=result.args, output=result.stdout) + except subprocess.CalledProcessError as e: + error_msg = e.stderr + if "KeyboardInterrupt" in error_msg: + raise KeyboardInterrupt(f"Error executing process task with keyboard interrupt.") + raise Exception(f"Error executing process task {e.stderr}") + +class DummyTask(Task): + def __init__(self, task_name, task_parameters): + self.task_name = task_name + self.task_parameters = task_parameters + + def execute(self): + message = self.task_parameters.get('message', 'No message provided') + logger.info("%s: %s", message, self.task_name, extra={'tag': self.task_name}) + +class DummyRandomTask(Task): + def __init__(self, task_name, task_parameters): + self.task_name = task_name + self.task_parameters = task_parameters + + def execute(self): + time_interval = self.task_parameters.get('time-seconds', [1, 5]) + sleep_time = random.uniform(time_interval[0], time_interval[1]) + + message = self.task_parameters.get('message', 'No message provided') + logger.info("%s: %s (Sleeping for %.2f seconds)", message, self.task_name, sleep_time, extra={'tag': self.task_name}) + + time.sleep(sleep_time) + +TASKS_HANDLERS = { + 'process': ProcessTask, + 'dummy': DummyTask, + 'dummy-random': DummyRandomTask, +} diff --git a/deployability/modules/workflow_engine/workflow_processor.py b/deployability/modules/workflow_engine/workflow_processor.py new file mode 100755 index 0000000000..bd4b19ef7f --- /dev/null +++ b/deployability/modules/workflow_engine/workflow_processor.py @@ -0,0 +1,384 @@ +# Copyright (C) 2015, Wazuh Inc. +# Created by Wazuh, Inc. . +# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 + +import concurrent.futures +import graphlib +import json +import time +import yaml +import os + +from pathlib import Path +from itertools import product + +from workflow_engine.logger.logger import logger +from workflow_engine.schema_validator import SchemaValidator +from workflow_engine.task import Task, TASKS_HANDLERS + +class WorkflowFile: + """Class for loading and processing a workflow file.""" + schema_path = Path(__file__).parent / 'schemas' / 'schema_v1.json' + + def __init__(self, workflow_file_path: Path | str, schema_path: Path | str = None) -> None: + self.schema_path = schema_path or self.schema_path + self.__validate_schema(workflow_file_path) + self.workflow_raw_data = self.__load_workflow(workflow_file_path) + self.task_collection = self.__process_workflow() + self.__static_workflow_validation() + + def __validate_schema(self, workflow_file: Path | str) -> None: + """ + Validate the workflow file against the schema. + + Args: + workflow_file (Path | str): Path to the workflow file. + """ + try: + logger.debug(f"Validating input file: {workflow_file}") + validator = SchemaValidator(self.schema_path, workflow_file) + validator.preprocess_data() + validator.validateSchema() + except Exception as e: + logger.error("Error while validating schema [%s] with error: %s", self.schema_path, e) + raise + + def __load_workflow(self, file_path: str) -> dict: + """ + Load the workflow data from a file. + + Args: + file_path (str): Path to the workflow file. + + Returns: + dict: Workflow data. + """ + + if not os.path.exists(file_path): + raise FileNotFoundError(f'File "{file_path}" not found.') + + logger.debug(f"Loading workflow file: {file_path}") + + with open(file_path, 'r', encoding='utf-8') as file: + return yaml.safe_load(file) + + def __process_workflow(self): + """Process the workflow and return a list of tasks.""" + logger.debug("Process workflow.") + task_collection = [] + variables = self.workflow_raw_data.get('variables', {}) + for task in self.workflow_raw_data.get('tasks', []): + task_collection.extend(self.__expand_task(task, variables)) + + if not task_collection: + raise ValueError("No tasks found in the workflow.") + return task_collection + + def __replace_placeholders(self, element: str, values: dict, parent_key: str = None): + """ + Recursively replace placeholders in a dictionary or list. + + Args: + element (Any): The element to process. + values (dict): The values to replace placeholders. + parent_key (str): The parent key for nested replacements. + + Returns: + Any: The processed element. + """ + if isinstance(element, dict): + return {key: self.__replace_placeholders(value, values, key) for key, value in element.items()} + if isinstance(element, list): + return [self.__replace_placeholders(sub_element, values, parent_key) for sub_element in element] + if isinstance(element, str): + return element.format_map(values) + return element + + def __expand_task(self, task: dict, variables: dict): + """ + Expand a task with variable values. + + Args: + task (dict): The task to expand. + variables (dict): Variable values. + + Returns: + List[dict]: List of expanded tasks. + """ + expanded_tasks = [] + + if 'foreach' in task: + loop_variables = task.get('foreach', [{}]) + + variable_names = [loop_variable_data.get('variable') for loop_variable_data in loop_variables] + as_identifiers = [loop_variable_data.get('as') for loop_variable_data in loop_variables] + + variable_values = [variables.get(name, []) for name in variable_names] + + for combination in product(*variable_values): + variables_with_items = {**variables, **dict(zip(as_identifiers, combination))} + expanded_tasks.append(self.__replace_placeholders(task, variables_with_items)) + else: + expanded_tasks.append(self.__replace_placeholders(task, variables)) + + return expanded_tasks + + def __static_workflow_validation(self): + """Validate the workflow against static criteria.""" + def check_duplicated_tasks(self): + """Validate task name duplication.""" + task_name_counts = {task['task']: 0 for task in self.task_collection} + + for task in self.task_collection: + task_name_counts[task['task']] += 1 + + duplicates = [name for name, count in task_name_counts.items() if count > 1] + + if duplicates: + raise ValueError(f"Duplicated task names: {', '.join(duplicates)}") + + def check_not_existing_tasks(self): + """Validate task existance.""" + task_names = {task['task'] for task in self.task_collection} + + for dependencies in [task.get('depends-on', []) for task in self.task_collection]: + non_existing_dependencies = [dependency for dependency in dependencies if dependency not in task_names] + if non_existing_dependencies: + raise ValueError(f"Tasks do not exist: {', '.join(non_existing_dependencies)}") + + validations = [check_duplicated_tasks, check_not_existing_tasks] + for validation in validations: + validation(self) + + +class DAG(): + """Class for creating a dependency graph.""" + def __init__(self, task_collection: list, reverse: bool = False): + self.task_collection = task_collection + self.reverse = reverse + self.dag, self.dependency_tree = self.__build_dag() + self.to_be_canceled = set() + self.finished_tasks_status = { + 'failed': set(), + 'canceled': set(), + 'successful': set(), + } + self.execution_plan = self.__create_execution_plan(self.dependency_tree) + self.dag.prepare() + + def is_active(self) -> bool: + """Check if the DAG is active.""" + return self.dag.is_active() + + def get_available_tasks(self) -> list: + """Get the available tasks.""" + return self.dag.get_ready() + + def get_execution_plan(self) -> dict: + """Get the execution plan.""" + return self.execution_plan + + def set_status(self, task_name: str, status: str): + """Set the status of a task.""" + self.finished_tasks_status[status].add(task_name) + self.dag.done(task_name) + + def should_be_canceled(self, task_name: str) -> bool: + """Check if a task should be canceled.""" + return task_name in self.to_be_canceled + + def __build_dag(self): + """Build a dependency graph for the tasks.""" + dependency_dict = {} + dag = graphlib.TopologicalSorter() + + for task in self.task_collection: + task_name = task['task'] + dependencies = task.get('depends-on', []) + + if self.reverse: + for dependency in dependencies: + dag.add(dependency, task_name) + else: + dag.add(task_name, *dependencies) + + dependency_dict[task_name] = dependencies + + return dag, dependency_dict + + def cancel_dependant_tasks(self, task_name, cancel_policy) -> None: + """Cancel all tasks that depend on a failed task.""" + def get_all_task_set(tasks): + task_set = set() + + for task, sub_tasks in tasks.items(): + task_set.add(task) + task_set.update(get_all_task_set(sub_tasks)) + + return task_set + + if cancel_policy == 'continue': + return + + not_cancelled_tasks = self.finished_tasks_status['failed'].union(self.finished_tasks_status['successful']) + for root_task, sub_tasks in self.execution_plan.items(): + task_set = get_all_task_set({root_task: sub_tasks}) + if cancel_policy == 'abort-all': + self.to_be_canceled.update(task_set) + elif cancel_policy == 'abort-related-flows': + if task_name in task_set: + self.to_be_canceled.update(task_set - not_cancelled_tasks) + else: + raise ValueError(f"Unknown cancel policy '{cancel_policy}'.") + + def __create_execution_plan(self, dependency_dict: dict) -> dict: + + execution_plan = {} + + def get_root_tasks(dependency_dict: dict) -> set: + """Get root tasks from the dependency dictionary.""" + all_tasks = set(dependency_dict.keys()) + dependent_tasks = set(dep for dependents in dependency_dict.values() for dep in dependents) + return all_tasks - dependent_tasks + + def get_subtask_plan(task_name: str, dependency_dict: dict, level: int = 0) -> dict: + """Create the execution plan recursively as a dictionary.""" + if task_name not in dependency_dict: + return {task_name: {}} + + dependencies = dependency_dict[task_name] + plan = {task_name: {}} + + for dependency in dependencies: + sub_plan = get_subtask_plan(dependency, dependency_dict, level + 1) + plan[task_name].update(sub_plan) + + return plan + + root_tasks = get_root_tasks(dependency_dict) + for root_task in root_tasks: + execution_plan.update(get_subtask_plan(root_task, dependency_dict)) + + return execution_plan + + +class WorkflowProcessor: + """Class for processing a workflow.""" + + def __init__(self, workflow_file: str, dry_run: bool, threads: int, log_level: str = 'INFO', schema_file: Path | str = None): + """ + Initialize WorkflowProcessor. + + Args: + workflow_file (str): Path to the workflow file (YAML format). + dry_run (bool): Display the plan without executing tasks. + threads (int): Number of threads to use for parallel execution. + log_level (str): Log level. + schema_file (Path | str): Path to the schema file (YAML format). + """ + logger.setLevel(log_level) + # Initialize the instance variables. + self.task_collection = WorkflowFile(workflow_file, schema_file).task_collection + self.dry_run = dry_run + self.threads = threads + + def execute_task(self, dag: DAG, task: dict, action) -> None: + """Execute a task.""" + task_name = task['task'] + if dag.should_be_canceled(task_name): + logger.warning("[%s] Skipping task due to dependency failure.", task_name) + dag.set_status(task_name, 'canceled') + else: + try: + task_object = self.create_task_object(task, action) + + logger.info("[%s] Starting task.", task_name) + start_time = time.time() + task_object.execute() + logger.info("[%s] Finished task in %.2f seconds.", task_name, time.time() - start_time) + dag.set_status(task_name, 'successful') + except KeyboardInterrupt as e: + logger.error("[%s] Task failed with error: %s.", task_name, e) + dag.set_status(task_name, 'failed') + dag.cancel_dependant_tasks(task_name, task.get('on-error', 'abort-related-flows')) + raise KeyboardInterrupt + except Exception as e: + logger.error("[%s] Task failed with error: %s.", task_name, e) + dag.set_status(task_name, 'failed') + dag.cancel_dependant_tasks(task_name, task.get('on-error', 'abort-related-flows')) + raise + + + def create_task_object(self, task: dict, action) -> Task: + """Create and return a Task object based on task type.""" + task_type = task[action]['this'] + + task_handler = TASKS_HANDLERS.get(task_type) + + if task_handler is not None: + return task_handler(task['task'], task[action]['with']) + + raise ValueError(f"Unknown task type '{task_type}'.") + + def execute_tasks_parallel(self, dag: DAG, reverse: bool = False) -> None: + """Execute tasks in parallel.""" + logger.info("Executing tasks in parallel.") + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor: + futures = self.generate_futures(dag, executor, reverse) + concurrent.futures.wait(futures.values()) + except KeyboardInterrupt: + logger.error("User interrupt detected. Aborting execution...") + self.execute_tasks_parallel(dag, reverse=True) + + def generate_futures(self, dag, executor, reverse: bool = False): + futures = {} + + while True: + if not dag.is_active(): + break + + for task_name in list(dag.get_available_tasks()): + task = next(t for t in self.task_collection if t['task'] == task_name) + action = 'cleanup' if reverse and 'cleanup' in task else 'do' + if reverse and 'cleanup' not in task: + dag.set_status(task_name, 'successful') + else: + future = executor.submit(self.execute_task, dag, task, action) + futures[task_name] = future + + return futures + + + def run(self) -> None: + """Main entry point.""" + try: + if not self.dry_run: + logger.info("Executing DAG tasks.") + dag = DAG(self.task_collection) + self.execute_tasks_parallel(dag) + + logger.info("Executing Reverse DAG tasks.") + reversed_dag = DAG(self.task_collection, reverse=True) + self.execute_tasks_parallel(reversed_dag, reverse=True) + else: + dag = DAG(self.task_collection) + logger.info("Execution plan: %s", json.dumps(dag.get_execution_plan(), indent=2)) + + except Exception as e: + logger.error("Error in Workflow: %s", e) + + + def handle_interrupt(self, signum, frame): + logger.error("User interrupt detected. End process...") + raise KeyboardInterrupt("User interrupt detected. End process...") + + def abort_execution(self, futures) -> None: + """Abort the execution of tasks.""" + with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor: + for future in concurrent.futures.as_completed(futures.values()): + try: + _ = future.result() + except Exception as e: + logger.error("Error in aborted task: %s", e) + executor.shutdown(wait=False, cancel_futures=True) diff --git a/deployability/version.json b/deployability/version.json new file mode 100644 index 0000000000..c885fdde40 --- /dev/null +++ b/deployability/version.json @@ -0,0 +1,4 @@ +{ + "version": "1.0", + "revision": "1" +} \ No newline at end of file