Skip to content

Commit

Permalink
Merge pull request #1837 from wazuh/1803-qactl-documentation
Browse files Browse the repository at this point in the history
Add documentation for the config_generator.py file
  • Loading branch information
jmv74211 authored Sep 3, 2021
2 parents 3f51723 + 3375c8b commit d7f7e6f
Showing 1 changed file with 131 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import random
import os

from tempfile import gettempdir
Expand All @@ -15,6 +14,27 @@


class QACTLConfigGenerator:
"""Class implemented with te purpose of generating the configuration fields needed in a YAML
configuration file of the QACTL tool. In order to get te configuration file generated automatically,
we use the info given by the documentation of the tests that are going to be executed. Plus, there can
be also a wazuh version parameter that indicates which version of wazuh the user wants to install.
Args:
tests (list): list with all the test that the user desires to run.
wazuh_version (string): version of the wazuh packages that the user desires to install.
This parameter is set to None by default. In case that version parameter is not given, the
wazuh package version will be taken from the documetation test information
Attributes:
tests (list): list with all the test that the user desires to run.
wazuh_version (string): version of the wazuh packages that the user desires to install.
This parameter is set to None by default. In case that version parameter is not given, the
wazuh package version will be taken from the documetation test information
qactl_used_ips_file (string): string with the full path where the file containing the used IPs
is located.
config_file_path (string): string with the full path where the YAML configuration file will be
generated.
"""

BOX_MAPPING = {
'ubuntu': 'qactl/ubuntu_20_04',
Expand Down Expand Up @@ -51,6 +71,11 @@ def __init__(self, tests, wazuh_version):
self.hosts = []

def __get_last_wazuh_version(self):
"""Get the last version of the wazuh packages available.
Returns:
str: string with the last available version of the wazuh packages.
"""
return '4.2.0'

def __get_qa_branch(self):
Expand All @@ -76,13 +101,26 @@ def __qa_docs_mocking(self, test_name):
file.write_json_file(mocked_file, mocking_data)

def __get_test_info(self, test_name):
"""Get information from a documented test.
Args:
test_name (string): string containing the name of the test.
Returns:
dict : return the info of the named test in dict format.
"""
self.__qa_docs_mocking(test_name)
info = file.read_json_file(f"{gettempdir()}/mocked_data.json")
info['test_name'] = test_name
file.delete_file(f"{gettempdir()}/mocked_data.json")
return info

def __get_all_tests_info(self):
"""Get the info of the documentation of all the test that are going to be run.
Returns:
dict object : dict containing all the information of the tests given from their documentation.
"""
#tests_info = [ __get_test_info(test) for test in self.tests ]
tests_info = [
{
Expand Down Expand Up @@ -117,20 +155,57 @@ def __get_all_tests_info(self):


def __validate_test_info(self, test_info, user_input=True, log_error=True):
"""Validate the test information in order to check that the fields that contains are suitable
for trying to generate a configuration data.
Args:
test_info (dict): dict containing all the info of the test. This info is by its containing documentation
user_input (boolean): boolean that checks if there is going to be an input from the user.
This parameter is set to True by default.
log_error (boolean): boolean that checks if there is going to be a logging of the errors.
This parameter is set to True by default.
Returns:
boolean : True if the validation has succeed, False otherwise
"""
def _ask_user_input():
user_continue = input('Do you want to continue with qa-ctl running? [y/n]: ')
if user_continue.lower() != 'y':
QACTLConfigGenerator.LOGGER.debug('The user has decided to stop execution due to incompatibility '
'between test and qa-ctl')
exit(0)
"""Ask if the user desires to continue with the current instance execution"""
user_continue = input('Do you want to continue with qa-ctl running? [y/n]: ')
if user_continue.lower() != 'y':
QACTLConfigGenerator.LOGGER.debug('The user has decided to stop execution due to incompatibility '
'between test and qa-ctl')
exit(0)

def _validation_error(log_error=True, error_message=None, user_input=True):
"""Show the possible validation error and check if the user wants to continue with the execution even if the
validation has failed.
Agrs:
user_input (boolean): boolean that checks if there is going to be an input from the user.
This parameter is set to True by default.
error_message (string): string containing the error message. This parameter is set to None by default.
log_error (boolean): boolean that checks if there is going to be a logging of the errors.
This parameter is set to True by default.
"""
if log_error:
QACTLConfigGenerator.LOGGER.error(error_message)
if user_input:
_ask_user_input()

def _check_validate(check, allowed_values, user_input, log_error, test_name, system):
"""Check if the validation process for a field has succeed.
Args:
check (string) : item that is going to be validated.
allowed_values (dict): dict containing all the allowed values for the item.
user_input (boolean): boolean that checks if there is going to be an input from the user.
log_error (boolean): boolean that checks if there is going to be a logging of the errors.
test_name (string): name of the test.
system (string): name of the system needed for running the test.
Returns:
boolean : True in case the validation checking has succeed, false otherwise.
"""
if check not in allowed_values:
error_message = f"{test_name} cannot be launched. Reason: Currently we do not "\
f"support {system}. Allowed values: {allowed_values}"
Expand Down Expand Up @@ -169,6 +244,11 @@ def _check_validate(check, allowed_values, user_input, log_error, test_name, sys
return True

def __get_host_IP(self):
"""Get an unused ip dinamically and in the range of 10.150.50.x. The ip is generated automatically
by checking in the IPs used file which IPs are already being used in order to avoid re-using them.
Returns:
str: string containing an unused IP."""
HOST_NETWORK = '10.150.50.x'

def ip_is_already_used(ip, qactl_host_used_ips):
Expand Down Expand Up @@ -199,13 +279,31 @@ def ip_is_already_used(ip, qactl_host_used_ips):
return host_ip

def __delete_ip_entry(self, host_ip):
"""Delete an IP entry in the file that contains all the IPs that are currently being used.
Args:
host_ip (string): contains the ip that is going to be deleted from the used IPs file.
"""
data = file.read_file(self.qactl_used_ips_file)

data = data.replace(f"{host_ip}\n", '')

file.write_file(self.qactl_used_ips_file, data)

def __add_instance(self, test_vendor, test_name, test_target, test_system, vm_cpu=1, vm_memory=1024):
"""Add a new provider instance for the deployment module. T
Args:
test_vendor (string): name of the vendor of the vagrant box.
test_name (string): contains the name of the test that is going to be run.
test_target (string): contains the target of the test.
test_system (string): The system in where the test needs to be run.
vm_cpu (int): number of CPUs that will be dedicated to the new vagrant box.
This parameter is set to 1 by default.
vm_memory (int): size of the ram that will be dedicated to the new vagrant box.
Returns:
dict object: dict containing all the field required for generating a new vagrant box in the deployment module."""
instance_ip = self.__get_host_IP()
instance = {
'enabled': True,
Expand All @@ -223,6 +321,14 @@ def __add_instance(self, test_vendor, test_name, test_target, test_system, vm_cp
return instance

def __get_package_url(self, instance):
"""Get the url of the package that needs to be installed.
Args:
instance (dict): dict object with all the information needed to generate the url of the package.
Returns:
package_url (string): String with the URL of the package.
"""
target = 'manager' if 'manager' in self.config['deployment'][instance]['provider']['vagrant']['label'] \
else 'agent'
vagrant_box = self.config['deployment'][instance]['provider']['vagrant']['vagrant_box']
Expand All @@ -234,6 +340,11 @@ def __get_package_url(self, instance):
return package_url

def __process_deployment_data(self, tests_info):
"""Generate the data for the deployment module with the information of the tests given as parameter.
Args:
test_info(dict object): dict object containing information of all the tests that are going to be run.
"""
self.config['deployment'] = {}

for test in tests_info:
Expand All @@ -257,6 +368,7 @@ def __process_deployment_data(self, tests_info):
}

def __process_provision_data(self):
"""Generate the data for the provision module using the fields from the already generated deployment module."""
self.config['provision'] = {'hosts': {}}

for instance in self.config['deployment'].keys():
Expand Down Expand Up @@ -292,6 +404,11 @@ def __process_provision_data(self):
}

def __process_test_data(self, tests_info):
"""Generate the data for the test module with the information of the tests given as parameter.
Args:
test_info(dict object): dict object containing information of all the tests that are going to be run.
"""
self.config['tests'] = {}
test_host_number = len(self.config['tests'].keys()) + 1

Expand All @@ -316,16 +433,24 @@ def __process_test_data(self, tests_info):
test_host_number += 1

def __process_test_info(self, tests_info):
"""Process all the info of the desired tests that are going to be run in order to generate the data configuration
for the YAML config file.
Args:
tests_info(dict object): dict object containing information of all the tests that are going to be run.
"""
self.__process_deployment_data(tests_info)
self.__process_provision_data()
self.__process_test_data(tests_info)

def run(self):
"""Run an instance with the parameters created. This generates the YAML configuration file automatically."""
info = self.__get_all_tests_info()
self.__process_test_info(info)
file.write_yaml_file(self.config_file_path, self.config)

def destroy(self):
"""Destroy the instance created by deleting its ip entry in the used IPs file and its configuration file."""
for host_ip in self.hosts:
self.__delete_ip_entry(host_ip)

Expand Down

0 comments on commit d7f7e6f

Please sign in to comment.