From dbc75a9392ad8e1d3e71a49ef2337f977ee91f9d Mon Sep 17 00:00:00 2001 From: Robin Schneider <36660054+ypid-geberit@users.noreply.github.com> Date: Fri, 23 Jul 2021 18:10:02 +0200 Subject: [PATCH] Fix/improve watcher test scripts: YAML support, metadata-git-commit, better error handling, --cacert, multiple time fields (#239) * run_test.py: Improve/cleanup and add YAML support for input files * Cleanup shell scripts according to ShellCheck recommendations * run_test.py: Cleanup Indices after test to not pollute tests env * run_test.py: More useful error message if logging action did not run * run_test.py: Refactor * run_test.py: Use load_file() for ES scripts as well to support YAML * run_test.py: Implement --no-execute-watch needed for deployment Needed for: https://github.com/elastic/elasticsearch/pull/30112#issuecomment-384889975 > I already have a workaround in place for this which consists of automatically deploying as many watches as I need to send different mails. Those watches are derived from my watch definition. For this, I extended [run_test.py](https://github.com/elastic/examples/blob/master/Alerting/Sample%20Watches/run_test.py) to inject Python code after the watch definition is read. Not ideal, but it is maintainable. * run_test.py: Support to inject Python code, useful for deployment Needed for: https://github.com/elastic/elasticsearch/pull/30112#issuecomment-384889975 > I already have a workaround in place for this which consists of automatically deploying as many watches as I need to send different mails. Those watches are derived from my watch definition. For this, I extended [run_test.py](https://github.com/elastic/examples/blob/master/Alerting/Sample%20Watches/run_test.py) to inject Python code after the watch definition is read. Not ideal, but it is maintainable. * run_test.py: Implement --no-test-index needed for deployment Needed for: https://github.com/elastic/elasticsearch/pull/30112#issuecomment-384889975 > I already have a workaround in place for this which consists of automatically deploying as many watches as I need to send different mails. Those watches are derived from my watch definition. For this, I extended [run_test.py](https://github.com/elastic/examples/blob/master/Alerting/Sample%20Watches/run_test.py) to inject Python code after the watch definition is read. Not ideal, but it is maintainable. * run_test.py: Add --metadata-git-commit switch to augment watch metadata * run_test.py: Add --cacert parameter * run_test.py: More useful error message if logging action did not run * run_test.py: Use `git rev-parse --short HEAD` for --metadata-git-commit * run_test.py: More useful error message if transform failed * run_test.py: Implement --minify-scripts Workaround for: https://github.com/elastic/elasticsearch/pull/35184 * "Scripts may be no longer than 16384 characters." is in ES6.6 * run_test.py: Improve compatibility with ES 7.0.x and index templates * run_test.py: Better error message if expected_response is not defined * run_test.py: Show watch exception on execution failure * run_test.py: In case a transform fails the transform input is relevant * run_test.py: Support multiple time fields Useful when you have two time fields that in reality should be very close so in testing it is enough to set them to the same value. * [run_test.py] ES 7 support. Update to Py3 and drop elasticsearch_xpack. * [run_test.py] Add --verbose parameter to debug ES responses * [run_test.py] Comply with Python Enhancement Proposals * [run_test.py] Comply with reuse.software * [run_test.py] Avoid `not` in condition to make it easier to understand * [run_test.py] Use str.format instead of "%s" % for consistency * [run_test.py] Fix ./run_all_tests.sh test run. All passing again. * [run_test.py] Support nested fields in time_fields test parameter Example: ```yaml time_fields: - '@timestamp' - 'event.created' ``` * [run_test.py] Use dict.get shortcut --- Alerting/Sample Watches/load_watch.sh | 20 +- Alerting/Sample Watches/run_all_tests.sh | 4 +- Alerting/Sample Watches/run_test.py | 241 +++++++++++++++++------ Alerting/Sample Watches/run_test.sh | 43 ++-- 4 files changed, 213 insertions(+), 95 deletions(-) mode change 100644 => 100755 Alerting/Sample Watches/run_test.py diff --git a/Alerting/Sample Watches/load_watch.sh b/Alerting/Sample Watches/load_watch.sh index 9c205ab8..d6740ac4 100755 --- a/Alerting/Sample Watches/load_watch.sh +++ b/Alerting/Sample Watches/load_watch.sh @@ -1,3 +1,5 @@ +#!/bin/bash + if [ -z "$1" ] ; then echo "USAGE: load_watch.sh : " echo "eg: ./load_watch.sh port_scan elastic changeme my_remote_cluster.mydomain:9200 https" @@ -19,11 +21,11 @@ fi port=9200 endpoint=localhost if [ "$4" ] ; then - if ":" in $4; then - endpoint=${4%":"*} # extractthe host value from the provided endpoint + if ":" in "$4"; then + endpoint=${4%":"*} # extract the host value from the provided endpoint port=${4#*":"} # extract the port value if provided in endpoint:port format if [ "$port" == "" ]; then - # if port is blank, due to endpoint provided as localhost: or no port providedthen use default port + # if port is blank, due to endpoint provided as localhost: or no port provided then use default port port=9200 fi else @@ -46,13 +48,13 @@ fi echo "Loading $1 scripts" shopt -s nullglob -for script in $1/scripts/*.json +for script in "$1/scripts"/*.json do filename=$(basename "$script") scriptname="${filename%.*}" - echo $scriptname - es_response=$(curl -H "Content-Type: application/json" -s -X POST $protocol$endpoint:$port/_scripts/$scriptname -u $username:$password -d @$script) - if [ 0 -eq $? ] && [ $es_response = '{"acknowledged":true}' ]; then + echo "$scriptname" + es_response=$(curl -H "Content-Type: application/json" -s -X POST "$protocol$endpoint:$port/_scripts/$scriptname" -u "$username:$password" -d "@$script") + if [ 0 -eq $? ] && [ "$es_response" = '{"acknowledged":true}' ]; then echo "Loading $scriptname script...OK" else echo "Loading $scriptname script...FAILED" @@ -62,9 +64,9 @@ done echo "Removing existing $1 watch " -curl -H "Content-Type: application/json" -s -X DELETE $protocol$endpoint:$port/_xpack/watcher/watch/$1 -u $username:$password +curl -H "Content-Type: application/json" -s -X DELETE "$protocol$endpoint:$port/_xpack/watcher/watch/$1" -u "$username:$password" echo "Loading $1 watch " -es_response=$(curl -H "Content-Type: application/json" --w "%{http_code}" -s -o /dev/null -X PUT $protocol$endpoint:$port/_xpack/watcher/watch/$1 -u $username:$password -d @$1/watch.json) +es_response=$(curl -H "Content-Type: application/json" --w "%{http_code}" -s -o /dev/null -X PUT "$protocol$endpoint:$port/_xpack/watcher/watch/$1" -u "$username:$password" -d "@$1/watch.json") if [ 0 -eq $? ] && [ $es_response = "201" ]; then echo "Loading $1 watch...OK" exit 0 diff --git a/Alerting/Sample Watches/run_all_tests.sh b/Alerting/Sample Watches/run_all_tests.sh index 20faad44..3d300fcf 100755 --- a/Alerting/Sample Watches/run_all_tests.sh +++ b/Alerting/Sample Watches/run_all_tests.sh @@ -1,2 +1,4 @@ #!/usr/bin/env bash -./run_test.sh '**' $1 $2 $3 $4 $5 +set -o nounset -o pipefail -o errexit + +./run_test.sh '**' "${1:-}" "${2:-}" "${3:-}" "${4:-}" "${5:-}" diff --git a/Alerting/Sample Watches/run_test.py b/Alerting/Sample Watches/run_test.py old mode 100644 new mode 100755 index 0460c71b..67b0c589 --- a/Alerting/Sample Watches/run_test.py +++ b/Alerting/Sample Watches/run_test.py @@ -1,81 +1,198 @@ -import sys +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# SPDX-FileCopyrightText: 2017 Dale McDiarmid +# SPDX-FileCopyrightText: 2017-2020 Robin Schneider +# SPDX-FileCopyrightText: 2020 Dan Roscigno +# SPDX-License-Identifier: Apache-2.0 -__author__ = 'dalem@elastic.co' +from __future__ import (print_function, unicode_literals, + absolute_import, division) import datetime +import json +import logging +import subprocess +import sys + +import yaml + from elasticsearch7 import Elasticsearch from elasticsearch7.client.ingest import IngestClient -import argparse -import json -parser = argparse.ArgumentParser(description='Index Connection Log data into ES with the last event at the current time') -parser.add_argument('--user',help='user') -parser.add_argument('--password',help='password') -parser.add_argument('--endpoint',help='endpoint') -parser.add_argument('--port',help='port') -parser.add_argument('--protocol',help='protocol') -parser.add_argument('--test_file',help='test file') - -parser.set_defaults(endpoint='localhost',port="9200",protocol="http",test_file='data.json',user='elastic',password='changeme') -args = parser.parse_args() -es = Elasticsearch([args.protocol+"://"+args.endpoint+":"+args.port],http_auth=(args.user, args.password)) - -def find_item(list, key): - for item in list: - if key in item: - return item - return None - -with open(args.test_file,'r') as test_file: - test=json.loads(test_file.read()) - try: - es.indices.delete(test['index']) - except: - print("Unable to delete current dataset") - pass - with open(test['mapping_file'],'r') as mapping_file: - es.indices.create(index=test["index"],body=json.loads(mapping_file.read())) - params={} - if "ingest_pipeline_file" in test: - with open(test['ingest_pipeline_file'],'r') as ingest_pipeline_file: - pipeline=json.loads(ingest_pipeline_file.read()) + +def set_value_as_default_for_leaf(nested_dict, path_exp, value): + if len(path_exp) == 1: + nested_dict.setdefault(path_exp[0], value) + elif path_exp[0] in nested_dict: + set_value_as_default_for_leaf(nested_dict[path_exp[0]], path_exp[1:], value) + + +def load_file(serialized_file): + with open(serialized_file, 'r') as serialized_file_fh: + if serialized_file.endswith('.json'): + decoded_object = json.loads(serialized_file_fh.read()) + elif serialized_file.endswith('.yml') or serialized_file.endswith('.yaml'): + decoded_object = yaml.safe_load(serialized_file_fh) + return decoded_object + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='Index Connection Log data into ES with the last event at the current time') + parser.add_argument('-v', '--verbose', help='verbose output', action='store_true') + parser.add_argument('--endpoint', help='endpoint') + parser.add_argument('--port', help='port') + parser.add_argument('--protocol', help='protocol') + parser.add_argument('--cacert', help='CA certificate to trust for HTTPS') + parser.add_argument('--user', help='user') + parser.add_argument('--password', help='password') + parser.add_argument('--test_file', help='test file') + parser.add_argument( + '--minify-scripts', + help='Minify script source code as workaround for' + + ' "Scripts may be no longer than 16384 characters." in ES < v6.6.', + action='store_true') + # Ref: https://github.com/elastic/elasticsearch/pull/35184 + parser.add_argument('--keep-index', help='Keep the index where test documents have been loaded to after the test', action='store_true') + parser.add_argument('--metadata-git-commit', help='Include the git commit hash in the metadata field of the watcher', action='store_true') + parser.add_argument('--modify-watch-by-eval', help='Python code to modify the watch before loading it into Elastic') + parser.add_argument( + '--no-test-index', + help='Don’t put the test data into an index.', + action='store_false', + dest='test_index') + parser.add_argument( + '--no-execute-watch', + help='Do not force watch execution. This can be useful when you use this script to deploy the watch.', + action='store_false', + dest='execute_watch') + + parser.set_defaults(endpoint='localhost', port="9200", protocol="http", test_file='data.json', user='elastic', password='changeme') + args = parser.parse_args() + + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + + es = Elasticsearch([args.protocol+"://"+args.endpoint+":"+args.port], http_auth=(args.user, args.password), ca_certs=args.cacert) + + test = load_file(args.test_file) + + if args.test_index: + # Load Mapping + try: + es.indices.delete(test['index']) + except Exception as err: + print("Unable to delete current dataset") + pass + index_template = load_file(test['mapping_file']) + for unneeded_keys in ['order', 'version', 'index_patterns']: + index_template.pop(unneeded_keys, None) + es.indices.create(index=test["index"], body=index_template) + + # Load pipeline if its declared + params = {} + if "ingest_pipeline_file" in test: + pipeline = load_file(test['ingest_pipeline_file']) p = IngestClient(es) - p.put_pipeline(id=test["watch_name"],body=pipeline) - params["pipeline"]=test["watch_name"] - current_data=last_time=datetime.datetime.utcnow() - i=0 - time_field = test["time_field"] if "time_field" in test else "@timestamp" - for event in test['events']: - event_time=current_data+datetime.timedelta(seconds=int(event['offset'] if 'offset' in event else 0)) - event[time_field]=event_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') if not time_field in event else event[time_field] - es.index(index=test['index'],body=event,id=event['id'] if "id" in event else i,params=params) - i+=1 - es.indices.refresh(index=test["index"]) + p.put_pipeline(id=test["watch_name"], body=pipeline) + params["pipeline"] = test["watch_name"] + + # Index data + current_data = last_time = datetime.datetime.utcnow() + i = 0 + time_fields = test.get('time_fields', test.get('time_field', '@timestamp')) + time_fields = set([time_fields] if isinstance(time_fields, str) else time_fields) + for event in test['events']: + # All offsets are in seconds. + event_time = current_data+datetime.timedelta(seconds=int(event.get('offset', 0))) + for time_field in time_fields: + time_field = time_field.split('.') + set_value_as_default_for_leaf(event, time_field, event_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + es.index(index=test['index'], body=event, id=event.get('id', i), params=params) + i += 1 + es.indices.refresh(index=test["index"]) + + # Load Scripts if 'scripts' in test: for script in test['scripts']: - with open(script['path'], 'r') as script_file: - es.put_script(id=script["name"],body=json.loads(script_file.read())) + script_content = load_file(script['path']) + if args.minify_scripts: + # https://stackoverflow.com/questions/30795954/how-to-uglify-or-minify-c-code + p = subprocess.Popen(['gcc', '-fpreprocessed', '-dD', '-E', '-P', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + script_content['script']['source'] = p.communicate(input=script_content['script']['source'].encode('utf-8'))[0].decode('utf-8') + es.put_script(id=script["name"], body=script_content) - with open(test['watch_file'],'r') as watch_file: - watch=json.loads(watch_file.read()) - es.watcher.put_watch(id=test["watch_name"],body=watch) - response=es.watcher.execute_watch(id=test["watch_name"]) + # Load Watch and Execute + watch = load_file(test['watch_file']) + if args.modify_watch_by_eval: + eval(compile(args.modify_watch_by_eval, '', 'exec')) + + if args.metadata_git_commit: + watch.setdefault('metadata', {}) + watch['metadata']['git_commit_hash'] = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']).strip() + watch['metadata']['git_uncommitted_changes'] = True if len(subprocess.check_output(['git', 'status', '--porcelain']).strip()) > 0 else False + + es.watcher.put_watch(id=test["watch_name"], body=watch) + + if args.execute_watch: + response = es.watcher.execute_watch(id=test["watch_name"]) + + # Cleanup after the test to not pollute the environment for other tests. + if not args.keep_index: + try: + es.indices.delete(test['index']) + except Exception as err: + print("Unable to delete current dataset") + pass + + # Confirm Matches match = test['match'] if 'match' in test else True - print("Expected: Watch Condition: %s"%match) - if not 'condition' in response['watch_record']['result']: - print("Condition not evaluated due to watch error") + print("Expected: Watch Condition: {}".format(match)) + if 'condition' not in response['watch_record']['result']: + print("Condition not evaluated due to watch error: {}".format( + json.dumps(response['watch_record'], sort_keys=True, indent=2) + )) print("TEST FAIL") sys.exit(1) - met=response['watch_record']['result']['condition']['met'] - print("Received: Watch Condition: %s"%met) + met = response['watch_record']['result']['condition']['met'] + print("Received: Watch Condition: {}".format(met)) if match: if met and response['watch_record']['result']['condition']['status'] == "success": - print("Expected: %s"%test['expected_response']) - logging=find_item(response['watch_record']['result']['actions'],'logging')['logging'] + print("Expected: {}".format(test.get('expected_response'))) + if len(response['watch_record']['result']['actions']) == 0: + if response['watch_record']['result']['transform']['status'] == 'failure': + print("No actions where taken because transform failed: {}".format( + json.dumps(response['watch_record']['result'], sort_keys=True, indent=2) + )) + else: + print("No actions where taken: {}".format( + json.dumps(response['watch_record']['result'], sort_keys=True, indent=2) + )) + print("TEST FAIL") + sys.exit(1) + + logging_action = next((action for action in response['watch_record']['result']['actions'] if action["type"] == "logging"), None) + if logging_action is None: + print("No logging actions was taken. This test framework uses the logging action for comparison so you might need enable this action.") + print("TEST FAIL") + sys.exit(1) + if logging_action.get('transform', {}).get('status', 'success') != 'success': + print("Logging transform script failed: {}".format( + json.dumps(logging_action.get('transform', {}), sort_keys=True, indent=2), + )) + print("TEST FAIL") + sys.exit(1) + if 'logging' not in logging_action: + print("Logging action is not present: {}".format(logging_action)) + print("TEST FAIL") + sys.exit(1) + logging = logging_action['logging'] if logging: - print("Received: %s"%logging['logged_text']) - if logging['logged_text'] == test['expected_response']: + print("Received: {}".format(logging['logged_text'])) + if logging['logged_text'] == test.get('expected_response'): print("TEST PASS") sys.exit(0) else: @@ -83,5 +200,5 @@ def find_item(list, key): print("TEST FAIL") sys.exit(1) else: - print("TEST %s"%("PASS" if not response['watch_record']['result']['condition']['met'] else "FAIL")) + print("TEST {}".format("FAIL" if response['watch_record']['result']['condition']['met'] else "PASS")) sys.exit(met) diff --git a/Alerting/Sample Watches/run_test.sh b/Alerting/Sample Watches/run_test.sh index 120a2fcf..d3ea9e38 100755 --- a/Alerting/Sample Watches/run_test.sh +++ b/Alerting/Sample Watches/run_test.sh @@ -1,25 +1,26 @@ if [ -z "$1" ]; then -echo "Specify watch name e.g. run_test.sh " + echo "Specify watch name e.g. run_test.sh " + exit 1 fi username=elastic if [ "$2" ] ; then - username=$2 + username="$2" fi password=changeme if [ "$3" ] ; then - password=$3 + password="$3" fi port=9200 endpoint=localhost if [ "$4" ] ; then - if ":" in $4; then - endpoint=${4%":"*} # extractthe host value from the provided endpoint + if ":" in "$4"; then + endpoint=${4%":"*} # extract the host value from the provided endpoint port=${4#*":"} # extract the port value if provided in endpoint:port format if [ "$port" == "" ]; then - # if port is blank, due to endpoint provided as localhost: or no port providedthen use default port + # if port is blank, due to endpoint provided as localhost: or no port provided then use default port port=9200 fi else @@ -36,26 +37,22 @@ num_tests=0 pass=0 fails=0 echo "--------------------------------------------------" -for test in `ls $1/tests/*.json`; do -echo "Running test $test" -python3 run_test.py --user $username --password $password --endpoint $endpoint --port $port --protocol $protocol --test_file $test +# shellcheck disable=SC2231 +for test in $1/tests/*.json; do + echo "Running test $test" -if [ $? -eq 0 ]; then -let pass=pass+1 -else -let fails=fails+1 -fi -let num_tests=num_tests+1 -echo "--------------------------------------------------" -done; + if python3 run_test.py --user "$username" --password "$password" --endpoint "$endpoint" --port "$port" --protocol "$protocol" --test_file "$test"; then + pass=$(( pass+1 )) + else + fails=$(( fails+1 )) + fi + num_tests=$(( num_tests+1 )) + echo "--------------------------------------------------" +done echo "$num_tests tests run: $pass passed. $fails failed." if [ $fails -eq 0 ]; then -exit 0 + exit 0 else -exit 1 + exit 1 fi - - - -