Skip to content

Commit

Permalink
Merge pull request #2416 from wazuh/2401-wazuh-metrics-reconnecting
Browse files Browse the repository at this point in the history
Fix `wazuh-metrics` CLI bug when child processes restart
  • Loading branch information
snaow authored Jan 24, 2022
2 parents b06e497 + e1e3621 commit 027eb9a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 16 deletions.
86 changes: 81 additions & 5 deletions deps/wazuh_testing/wazuh_testing/scripts/wazuh_metrics.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
import argparse
import logging
from collections import defaultdict
from datetime import datetime
from os import makedirs
from os.path import join
from signal import signal, SIGTERM, SIGINT
from tempfile import gettempdir
from time import time
from time import time, sleep

from wazuh_testing.tools.performance.binary import Monitor, logger

METRICS_FOLDER = join(gettempdir(), 'process_metrics')
CURRENT_SESSION = join(METRICS_FOLDER, datetime.now().strftime('%d-%m-%Y'), str(int(time())))
MONITOR_LIST = []
ACTIVE_MONITORS = defaultdict(list)
SESSION_ACTIVE = True


def shutdown_threads(signal_number, frame):
logger.info('Attempting to shutdown all monitor threads')
for monitor in MONITOR_LIST:

global SESSION_ACTIVE
SESSION_ACTIVE = False

# Shutdown all possible monitors
for monitor in sum(ACTIVE_MONITORS.values(), []):
monitor.shutdown()
logger.info('Process finished')

logger.info('Process finished gracefully')


def get_script_arguments():
Expand All @@ -33,12 +41,78 @@ def get_script_arguments():
parser.add_argument('-v', '--version', dest='version', default=None, help='Version of the binaries. Default none.')
parser.add_argument('-d', '--debug', dest='debug', action='store_true', default=False,
help='Enable debug level logging.')
parser.add_argument('-H', '--healthcheck-time', dest='healthcheck_time', action='store', default=10, type=int,
help='Time in seconds between each health check.')
parser.add_argument('-r', '--retries', dest='health_retries', action='store', default=5, type=int,
help='Number of reconnection retries before aborting the monitoring process.')
parser.add_argument('--store', dest='store_path', action='store', default=gettempdir(),
help=f"Path to store the CSVs with the data. Default {gettempdir()}.")

return parser.parse_args()


def check_monitors_health(options):
"""Write the collected data in a CSV file.
Args:
options (argparse.Options): object containing the script options.
Returns:
bool: True if there were any errors. False otherwise.
"""
healthy = True
for process, monitors in ACTIVE_MONITORS.items():
# Check if there is any unhealthy monitor
if any(filter(lambda m: m.event.is_set(), monitors)):
logger.warning(f'Monitoring of {process} failed. Attempting to create new monitor instances')

try:
# Try to get new PIDs
process_pids = Monitor.get_process_pids(process)
# Shutdown all the related monitors to the failed process (necessary for multiprocessing)
for monitor in monitors:
monitor.shutdown()
except ValueError:
healthy = False
logger.warning(f'Could not create new monitor instances for {process}')
continue

for i, pid in enumerate(process_pids):
# Attempt to create new monitor instances for the process
p_name = process if i == 0 else f'{process}_child_{i}'
monitor = Monitor(process_name=p_name, pid=pid, value_unit=options.data_unit,
time_step=options.sleep_time,
version=options.version, dst_dir=options.store_path)
monitor.start()

try:
# Replace old monitors for new ones
ACTIVE_MONITORS[process][i] = monitor
except IndexError:
ACTIVE_MONITORS[process].append(monitor)

return healthy


def monitors_healthcheck(options):
"""Check each monitor's health while the session is active.
Args:
options (argparse.Options): object containing the script options.
"""
errors = 0
while SESSION_ACTIVE:
if check_monitors_health(options):
errors = 0
else:
errors += 1
if errors >= options.health_retries:
logger.error('Reached maximum number of retries. Aborting')
exit(1)

sleep(options.healthcheck_time)


def main():
signal(SIGTERM, shutdown_threads)
signal(SIGINT, shutdown_threads)
Expand All @@ -59,7 +133,9 @@ def main():
monitor = Monitor(process_name=p_name, pid=pid, value_unit=options.data_unit, time_step=options.sleep_time,
version=options.version, dst_dir=options.store_path)
monitor.start()
MONITOR_LIST.append(monitor)
ACTIVE_MONITORS[process].append(monitor)

monitors_healthcheck(options)


if __name__ == '__main__':
Expand Down
13 changes: 2 additions & 11 deletions deps/wazuh_testing/wazuh_testing/tools/performance/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import psutil

MONITOR_LIST = []

logger = logging.getLogger('wazuh-monitor')
logger.setLevel(logging.INFO)

Expand Down Expand Up @@ -175,15 +173,8 @@ def unit_conversion(x):
self.previous_read = info[f'Disk_Read({self.value_unit})']
self.previous_write = info[f'Disk_Written({self.value_unit})']
except psutil.NoSuchProcess:
logger.warning(f'Lost PID for {self.process_name}. Trying to obtain a new one. '
'If the process has child processes, this test will not be valid')
try:
# Try to get another PID for the current process name. This could be wrong if there is more than
# one process with the same name (child processes)
self.pid = Monitor.get_process_pids(self.process_name, check_children=False)[0]
self.set_process()
except ValueError:
logger.warning(f'Could not obtain a new PID for {self.process_name}. Trying again in {self.time_step}s')
logger.warning(f'Lost PID for {self.process_name}')
self.shutdown()
finally:
info.update({key: round(value, 2) for key, value in info.items() if isinstance(value, (int, float))})
logger.debug(f'Recollected data for process {self.pid}')
Expand Down

0 comments on commit 027eb9a

Please sign in to comment.