Skip to content

Commit

Permalink
Fix/improve watcher test scripts: YAML support, metadata-git-commit, …
Browse files Browse the repository at this point in the history
…better error handling, --cacert, multiple time fields (#239)

* run_test.py: Improve/cleanup and add YAML support for input files

* Cleanup shell scripts according to ShellCheck recommendations

* run_test.py: Cleanup Indices after test to not pollute tests env

* run_test.py: More useful error message if logging action did not run

* run_test.py: Refactor

* run_test.py: Use load_file() for ES scripts as well to support YAML

* run_test.py: Implement --no-execute-watch needed for deployment

Needed for: elastic/elasticsearch#30112 (comment)

> I already have a workaround in place for this which consists of automatically deploying as many watches as I need to send different mails. Those watches are derived from my watch definition. For this, I extended [run_test.py](https://github.com/elastic/examples/blob/master/Alerting/Sample%20Watches/run_test.py) to inject Python code after the watch definition is read. Not ideal, but it is maintainable.

* run_test.py: Support to inject Python code, useful for deployment

Needed for: elastic/elasticsearch#30112 (comment)

> I already have a workaround in place for this which consists of automatically deploying as many watches as I need to send different mails. Those watches are derived from my watch definition. For this, I extended [run_test.py](https://github.com/elastic/examples/blob/master/Alerting/Sample%20Watches/run_test.py) to inject Python code after the watch definition is read. Not ideal, but it is maintainable.

* run_test.py: Implement --no-test-index needed for deployment

Needed for: elastic/elasticsearch#30112 (comment)

> I already have a workaround in place for this which consists of automatically deploying as many watches as I need to send different mails. Those watches are derived from my watch definition. For this, I extended [run_test.py](https://github.com/elastic/examples/blob/master/Alerting/Sample%20Watches/run_test.py) to inject Python code after the watch definition is read. Not ideal, but it is maintainable.

* run_test.py: Add --metadata-git-commit switch to augment watch metadata

* run_test.py: Add --cacert parameter

* run_test.py: More useful error message if logging action did not run

* run_test.py: Use `git rev-parse --short HEAD` for --metadata-git-commit

* run_test.py: More useful error message if transform failed

* run_test.py: Implement --minify-scripts

Workaround for: elastic/elasticsearch#35184

* "Scripts may be no longer than 16384 characters." is in ES<v6.6 not >6.6

* run_test.py: Improve compatibility with ES 7.0.x and index templates

* run_test.py: Better error message if expected_response is not defined

* run_test.py: Show watch exception on execution failure

* run_test.py: In case a transform fails the transform input is relevant

* run_test.py: Support multiple time fields

Useful when you have two time fields that in reality should be very
close so in testing it is enough to set them to the same value.

* [run_test.py] ES 7 support. Update to Py3 and drop elasticsearch_xpack.

* [run_test.py] Add --verbose parameter to debug ES responses

* [run_test.py] Comply with Python Enhancement Proposals

* [run_test.py] Comply with reuse.software

* [run_test.py] Avoid `not` in condition to make it easier to understand

* [run_test.py] Use str.format instead of "%s" % for consistency

* [run_test.py] Fix ./run_all_tests.sh test run. All passing again.

* [run_test.py] Support nested fields in time_fields test parameter

Example:

```yaml
time_fields:
  - '@timestamp'
  - 'event.created'
```

* [run_test.py] Use dict.get shortcut
  • Loading branch information
ypid-geberit authored Jul 23, 2021
1 parent 17f0b1d commit dbc75a9
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 95 deletions.
20 changes: 11 additions & 9 deletions Alerting/Sample Watches/load_watch.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/bin/bash

if [ -z "$1" ] ; then
echo "USAGE: load_watch.sh <watch_name> <optional_username> <optional_password> <optional_endpoint>:<optional_port> <optional_protocol>"
echo "eg: ./load_watch.sh port_scan elastic changeme my_remote_cluster.mydomain:9200 https"
Expand All @@ -19,11 +21,11 @@ fi
port=9200
endpoint=localhost
if [ "$4" ] ; then
if ":" in $4; then
endpoint=${4%":"*} # extractthe host value from the provided endpoint
if ":" in "$4"; then
endpoint=${4%":"*} # extract the host value from the provided endpoint
port=${4#*":"} # extract the port value if provided in endpoint:port format
if [ "$port" == "" ]; then
# if port is blank, due to endpoint provided as localhost: or no port providedthen use default port
# if port is blank, due to endpoint provided as localhost: or no port provided then use default port
port=9200
fi
else
Expand All @@ -46,13 +48,13 @@ fi
echo "Loading $1 scripts"

shopt -s nullglob
for script in $1/scripts/*.json
for script in "$1/scripts"/*.json
do
filename=$(basename "$script")
scriptname="${filename%.*}"
echo $scriptname
es_response=$(curl -H "Content-Type: application/json" -s -X POST $protocol$endpoint:$port/_scripts/$scriptname -u $username:$password -d @$script)
if [ 0 -eq $? ] && [ $es_response = '{"acknowledged":true}' ]; then
echo "$scriptname"
es_response=$(curl -H "Content-Type: application/json" -s -X POST "$protocol$endpoint:$port/_scripts/$scriptname" -u "$username:$password" -d "@$script")
if [ 0 -eq $? ] && [ "$es_response" = '{"acknowledged":true}' ]; then
echo "Loading $scriptname script...OK"
else
echo "Loading $scriptname script...FAILED"
Expand All @@ -62,9 +64,9 @@ done


echo "Removing existing $1 watch "
curl -H "Content-Type: application/json" -s -X DELETE $protocol$endpoint:$port/_xpack/watcher/watch/$1 -u $username:$password
curl -H "Content-Type: application/json" -s -X DELETE "$protocol$endpoint:$port/_xpack/watcher/watch/$1" -u "$username:$password"
echo "Loading $1 watch "
es_response=$(curl -H "Content-Type: application/json" --w "%{http_code}" -s -o /dev/null -X PUT $protocol$endpoint:$port/_xpack/watcher/watch/$1 -u $username:$password -d @$1/watch.json)
es_response=$(curl -H "Content-Type: application/json" --w "%{http_code}" -s -o /dev/null -X PUT "$protocol$endpoint:$port/_xpack/watcher/watch/$1" -u "$username:$password" -d "@$1/watch.json")
if [ 0 -eq $? ] && [ $es_response = "201" ]; then
echo "Loading $1 watch...OK"
exit 0
Expand Down
4 changes: 3 additions & 1 deletion Alerting/Sample Watches/run_all_tests.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
#!/usr/bin/env bash
./run_test.sh '**' $1 $2 $3 $4 $5
set -o nounset -o pipefail -o errexit

./run_test.sh '**' "${1:-}" "${2:-}" "${3:-}" "${4:-}" "${5:-}"
241 changes: 179 additions & 62 deletions Alerting/Sample Watches/run_test.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,87 +1,204 @@
import sys
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# SPDX-FileCopyrightText: 2017 Dale McDiarmid <dalem@elastic.co>
# SPDX-FileCopyrightText: 2017-2020 Robin Schneider <robin.schneider@geberit.com>
# SPDX-FileCopyrightText: 2020 Dan Roscigno <dan@roscigno.com>
# SPDX-License-Identifier: Apache-2.0

__author__ = 'dalem@elastic.co'
from __future__ import (print_function, unicode_literals,
absolute_import, division)

import datetime
import json
import logging
import subprocess
import sys

import yaml

from elasticsearch7 import Elasticsearch
from elasticsearch7.client.ingest import IngestClient
import argparse
import json

parser = argparse.ArgumentParser(description='Index Connection Log data into ES with the last event at the current time')
parser.add_argument('--user',help='user')
parser.add_argument('--password',help='password')
parser.add_argument('--endpoint',help='endpoint')
parser.add_argument('--port',help='port')
parser.add_argument('--protocol',help='protocol')
parser.add_argument('--test_file',help='test file')

parser.set_defaults(endpoint='localhost',port="9200",protocol="http",test_file='data.json',user='elastic',password='changeme')
args = parser.parse_args()
es = Elasticsearch([args.protocol+"://"+args.endpoint+":"+args.port],http_auth=(args.user, args.password))

def find_item(list, key):
for item in list:
if key in item:
return item
return None

with open(args.test_file,'r') as test_file:
test=json.loads(test_file.read())
try:
es.indices.delete(test['index'])
except:
print("Unable to delete current dataset")
pass
with open(test['mapping_file'],'r') as mapping_file:
es.indices.create(index=test["index"],body=json.loads(mapping_file.read()))
params={}
if "ingest_pipeline_file" in test:
with open(test['ingest_pipeline_file'],'r') as ingest_pipeline_file:
pipeline=json.loads(ingest_pipeline_file.read())

def set_value_as_default_for_leaf(nested_dict, path_exp, value):
if len(path_exp) == 1:
nested_dict.setdefault(path_exp[0], value)
elif path_exp[0] in nested_dict:
set_value_as_default_for_leaf(nested_dict[path_exp[0]], path_exp[1:], value)


def load_file(serialized_file):
with open(serialized_file, 'r') as serialized_file_fh:
if serialized_file.endswith('.json'):
decoded_object = json.loads(serialized_file_fh.read())
elif serialized_file.endswith('.yml') or serialized_file.endswith('.yaml'):
decoded_object = yaml.safe_load(serialized_file_fh)
return decoded_object


if __name__ == '__main__':
import argparse

parser = argparse.ArgumentParser(description='Index Connection Log data into ES with the last event at the current time')
parser.add_argument('-v', '--verbose', help='verbose output', action='store_true')
parser.add_argument('--endpoint', help='endpoint')
parser.add_argument('--port', help='port')
parser.add_argument('--protocol', help='protocol')
parser.add_argument('--cacert', help='CA certificate to trust for HTTPS')
parser.add_argument('--user', help='user')
parser.add_argument('--password', help='password')
parser.add_argument('--test_file', help='test file')
parser.add_argument(
'--minify-scripts',
help='Minify script source code as workaround for' +
' "Scripts may be no longer than 16384 characters." in ES < v6.6.',
action='store_true')
# Ref: https://github.com/elastic/elasticsearch/pull/35184
parser.add_argument('--keep-index', help='Keep the index where test documents have been loaded to after the test', action='store_true')
parser.add_argument('--metadata-git-commit', help='Include the git commit hash in the metadata field of the watcher', action='store_true')
parser.add_argument('--modify-watch-by-eval', help='Python code to modify the watch before loading it into Elastic')
parser.add_argument(
'--no-test-index',
help='Don’t put the test data into an index.',
action='store_false',
dest='test_index')
parser.add_argument(
'--no-execute-watch',
help='Do not force watch execution. This can be useful when you use this script to deploy the watch.',
action='store_false',
dest='execute_watch')

parser.set_defaults(endpoint='localhost', port="9200", protocol="http", test_file='data.json', user='elastic', password='changeme')
args = parser.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)

es = Elasticsearch([args.protocol+"://"+args.endpoint+":"+args.port], http_auth=(args.user, args.password), ca_certs=args.cacert)

test = load_file(args.test_file)

if args.test_index:
# Load Mapping
try:
es.indices.delete(test['index'])
except Exception as err:
print("Unable to delete current dataset")
pass
index_template = load_file(test['mapping_file'])
for unneeded_keys in ['order', 'version', 'index_patterns']:
index_template.pop(unneeded_keys, None)
es.indices.create(index=test["index"], body=index_template)

# Load pipeline if its declared
params = {}
if "ingest_pipeline_file" in test:
pipeline = load_file(test['ingest_pipeline_file'])
p = IngestClient(es)
p.put_pipeline(id=test["watch_name"],body=pipeline)
params["pipeline"]=test["watch_name"]
current_data=last_time=datetime.datetime.utcnow()
i=0
time_field = test["time_field"] if "time_field" in test else "@timestamp"
for event in test['events']:
event_time=current_data+datetime.timedelta(seconds=int(event['offset'] if 'offset' in event else 0))
event[time_field]=event_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') if not time_field in event else event[time_field]
es.index(index=test['index'],body=event,id=event['id'] if "id" in event else i,params=params)
i+=1
es.indices.refresh(index=test["index"])
p.put_pipeline(id=test["watch_name"], body=pipeline)
params["pipeline"] = test["watch_name"]

# Index data
current_data = last_time = datetime.datetime.utcnow()
i = 0
time_fields = test.get('time_fields', test.get('time_field', '@timestamp'))
time_fields = set([time_fields] if isinstance(time_fields, str) else time_fields)
for event in test['events']:
# All offsets are in seconds.
event_time = current_data+datetime.timedelta(seconds=int(event.get('offset', 0)))
for time_field in time_fields:
time_field = time_field.split('.')
set_value_as_default_for_leaf(event, time_field, event_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
es.index(index=test['index'], body=event, id=event.get('id', i), params=params)
i += 1
es.indices.refresh(index=test["index"])

# Load Scripts
if 'scripts' in test:
for script in test['scripts']:
with open(script['path'], 'r') as script_file:
es.put_script(id=script["name"],body=json.loads(script_file.read()))
script_content = load_file(script['path'])
if args.minify_scripts:
# https://stackoverflow.com/questions/30795954/how-to-uglify-or-minify-c-code
p = subprocess.Popen(['gcc', '-fpreprocessed', '-dD', '-E', '-P', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
script_content['script']['source'] = p.communicate(input=script_content['script']['source'].encode('utf-8'))[0].decode('utf-8')
es.put_script(id=script["name"], body=script_content)

with open(test['watch_file'],'r') as watch_file:
watch=json.loads(watch_file.read())
es.watcher.put_watch(id=test["watch_name"],body=watch)
response=es.watcher.execute_watch(id=test["watch_name"])
# Load Watch and Execute
watch = load_file(test['watch_file'])

if args.modify_watch_by_eval:
eval(compile(args.modify_watch_by_eval, '<string>', 'exec'))

if args.metadata_git_commit:
watch.setdefault('metadata', {})
watch['metadata']['git_commit_hash'] = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']).strip()
watch['metadata']['git_uncommitted_changes'] = True if len(subprocess.check_output(['git', 'status', '--porcelain']).strip()) > 0 else False

es.watcher.put_watch(id=test["watch_name"], body=watch)

if args.execute_watch:
response = es.watcher.execute_watch(id=test["watch_name"])

# Cleanup after the test to not pollute the environment for other tests.
if not args.keep_index:
try:
es.indices.delete(test['index'])
except Exception as err:
print("Unable to delete current dataset")
pass

# Confirm Matches
match = test['match'] if 'match' in test else True
print("Expected: Watch Condition: %s"%match)
if not 'condition' in response['watch_record']['result']:
print("Condition not evaluated due to watch error")
print("Expected: Watch Condition: {}".format(match))
if 'condition' not in response['watch_record']['result']:
print("Condition not evaluated due to watch error: {}".format(
json.dumps(response['watch_record'], sort_keys=True, indent=2)
))
print("TEST FAIL")
sys.exit(1)
met=response['watch_record']['result']['condition']['met']
print("Received: Watch Condition: %s"%met)
met = response['watch_record']['result']['condition']['met']
print("Received: Watch Condition: {}".format(met))
if match:
if met and response['watch_record']['result']['condition']['status'] == "success":
print("Expected: %s"%test['expected_response'])
logging=find_item(response['watch_record']['result']['actions'],'logging')['logging']
print("Expected: {}".format(test.get('expected_response')))
if len(response['watch_record']['result']['actions']) == 0:
if response['watch_record']['result']['transform']['status'] == 'failure':
print("No actions where taken because transform failed: {}".format(
json.dumps(response['watch_record']['result'], sort_keys=True, indent=2)
))
else:
print("No actions where taken: {}".format(
json.dumps(response['watch_record']['result'], sort_keys=True, indent=2)
))
print("TEST FAIL")
sys.exit(1)

logging_action = next((action for action in response['watch_record']['result']['actions'] if action["type"] == "logging"), None)
if logging_action is None:
print("No logging actions was taken. This test framework uses the logging action for comparison so you might need enable this action.")
print("TEST FAIL")
sys.exit(1)
if logging_action.get('transform', {}).get('status', 'success') != 'success':
print("Logging transform script failed: {}".format(
json.dumps(logging_action.get('transform', {}), sort_keys=True, indent=2),
))
print("TEST FAIL")
sys.exit(1)
if 'logging' not in logging_action:
print("Logging action is not present: {}".format(logging_action))
print("TEST FAIL")
sys.exit(1)
logging = logging_action['logging']
if logging:
print("Received: %s"%logging['logged_text'])
if logging['logged_text'] == test['expected_response']:
print("Received: {}".format(logging['logged_text']))
if logging['logged_text'] == test.get('expected_response'):
print("TEST PASS")
sys.exit(0)
else:
print("Logging action required for testing")
print("TEST FAIL")
sys.exit(1)
else:
print("TEST %s"%("PASS" if not response['watch_record']['result']['condition']['met'] else "FAIL"))
print("TEST {}".format("FAIL" if response['watch_record']['result']['condition']['met'] else "PASS"))
sys.exit(met)
43 changes: 20 additions & 23 deletions Alerting/Sample Watches/run_test.sh
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
if [ -z "$1" ]; then
echo "Specify watch name e.g. run_test.sh <foldername>"
echo "Specify watch name e.g. run_test.sh <foldername>"
exit 1
fi

username=elastic
if [ "$2" ] ; then
username=$2
username="$2"
fi

password=changeme
if [ "$3" ] ; then
password=$3
password="$3"
fi

port=9200
endpoint=localhost
if [ "$4" ] ; then
if ":" in $4; then
endpoint=${4%":"*} # extractthe host value from the provided endpoint
if ":" in "$4"; then
endpoint=${4%":"*} # extract the host value from the provided endpoint
port=${4#*":"} # extract the port value if provided in endpoint:port format
if [ "$port" == "" ]; then
# if port is blank, due to endpoint provided as localhost: or no port providedthen use default port
# if port is blank, due to endpoint provided as localhost: or no port provided then use default port
port=9200
fi
else
Expand All @@ -36,26 +37,22 @@ num_tests=0
pass=0
fails=0
echo "--------------------------------------------------"
for test in `ls $1/tests/*.json`; do
echo "Running test $test"
python3 run_test.py --user $username --password $password --endpoint $endpoint --port $port --protocol $protocol --test_file $test
# shellcheck disable=SC2231
for test in $1/tests/*.json; do
echo "Running test $test"

if [ $? -eq 0 ]; then
let pass=pass+1
else
let fails=fails+1
fi
let num_tests=num_tests+1
echo "--------------------------------------------------"
done;
if python3 run_test.py --user "$username" --password "$password" --endpoint "$endpoint" --port "$port" --protocol "$protocol" --test_file "$test"; then
pass=$(( pass+1 ))
else
fails=$(( fails+1 ))
fi
num_tests=$(( num_tests+1 ))
echo "--------------------------------------------------"
done

echo "$num_tests tests run: $pass passed. $fails failed."
if [ $fails -eq 0 ]; then
exit 0
exit 0
else
exit 1
exit 1
fi




0 comments on commit dbc75a9

Please sign in to comment.