From 6cda978e5bc98baf69223840b35688a76b2c58cc Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 30 Aug 2017 17:57:36 -0700 Subject: [PATCH 01/15] [metrics] adding total processed size to metrics --- stream_alert/rule_processor/handler.py | 7 +++++++ stream_alert/shared/metrics.py | 2 ++ 2 files changed, 9 insertions(+) diff --git a/stream_alert/rule_processor/handler.py b/stream_alert/rule_processor/handler.py index f80f6e3a6..166d94153 100644 --- a/stream_alert/rule_processor/handler.py +++ b/stream_alert/rule_processor/handler.py @@ -53,6 +53,7 @@ def __init__(self, context, enable_alert_processor=True): self.enable_alert_processor = enable_alert_processor self._failed_record_count = 0 + self._processed_size = 0 self._alerts = [] def run(self, event): @@ -103,6 +104,10 @@ def run(self, event): self._process_alerts(payload) + MetricLogger.log_metric(FUNCTION_NAME, + MetricLogger.TOTAL_PROCESSED_SIZE, + self._processed_size) + LOGGER.debug('Invalid record count: %d', self._failed_record_count) MetricLogger.log_metric(FUNCTION_NAME, @@ -135,6 +140,8 @@ def _process_alerts(self, payload): payload (StreamPayload): StreamAlert payload object being processed """ for record in payload.pre_parse(): + # Increment the processed size using the length of this record + self._processed_size += len(record.pre_parsed_record) self.classifier.classify_record(record) if not record.valid: if self.env['lambda_alias'] != 'development': diff --git a/stream_alert/shared/metrics.py b/stream_alert/shared/metrics.py index b8a323dce..f79c74f03 100644 --- a/stream_alert/shared/metrics.py +++ b/stream_alert/shared/metrics.py @@ -45,6 +45,7 @@ class MetricLogger(object): # Constant metric names used for CloudWatch FAILED_PARSES = 'FailedParses' S3_DOWNLOAD_TIME = 'S3DownloadTime' + TOTAL_PROCESSED_SIZE = 'TotalProcessedSize' TOTAL_RECORDS = 'TotalRecords' TOTAL_S3_RECORDS = 'TotalS3Records' TRIGGERED_ALERTS = 'TriggeredAlerts' @@ -64,6 +65,7 @@ class MetricLogger(object): RULE_PROCESSOR_NAME: { FAILED_PARSES: (_default_filter.format(FAILED_PARSES), _default_value_lookup), S3_DOWNLOAD_TIME: (_default_filter.format(S3_DOWNLOAD_TIME), _default_value_lookup), + TOTAL_PROCESSED_SIZE: (_default_filter.format(TOTAL_RECORDS), _default_value_lookup), TOTAL_RECORDS: (_default_filter.format(TOTAL_RECORDS), _default_value_lookup), TOTAL_S3_RECORDS: (_default_filter.format(TOTAL_S3_RECORDS), _default_value_lookup), TRIGGERED_ALERTS: (_default_filter.format(TRIGGERED_ALERTS), _default_value_lookup) From 98516298897e5e76403bb27650d533419f96ff18 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 1 Sep 2017 13:43:36 -0700 Subject: [PATCH 02/15] [cli] cli support for adding metric alarms for predefined metrics * Fixing bug in help string for validate-schemas command --- manage.py | 248 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 244 insertions(+), 4 deletions(-) diff --git a/manage.py b/manage.py index d911443ee..f82bfc748 100755 --- a/manage.py +++ b/manage.py @@ -25,7 +25,9 @@ """ from argparse import ArgumentParser, RawTextHelpFormatter, SUPPRESS as ARGPARSE_SUPPRESS import os +import re +from stream_alert.shared import metrics from stream_alert_cli import __version__ as version from stream_alert_cli.logger import LOGGER_CLI from stream_alert_cli.runner import cli_runner @@ -113,9 +115,8 @@ def _add_live_test_subparser(subparsers): live_test_parser.set_defaults(command='live-test') # get cluster choices from available files - clusters = [] - for _, _, files in os.walk('conf/clusters'): - clusters.extend(os.path.splitext(cluster)[0] for cluster in files) + clusters = [os.path.splitext(cluster)[0] for _, _, files + in os.walk('conf/clusters') for cluster in files] # add clusters for user to pick from live_test_parser.add_argument( @@ -145,7 +146,8 @@ def _add_validate_schema_subparser(subparsers): schema_validation_usage = 'manage.py validate-schemas [options]' schema_validation_description = (""" StreamAlertCLI v{} -Run end-to-end tests that will attempt to send alerts +Run validation of schemas in logs.json using configured integration test files. Validation +does not actually run the rules engine on test events. Available Options: @@ -190,6 +192,243 @@ def _add_validate_schema_subparser(subparsers): ) +def _add_metric_alarm_subparser(subparsers): + """Add the create-alarm subparser: manage.py create-alarm [options]""" + metric_alarm_usage = 'manage.py create-alarm [options]' + + # get the available metrics to be used + available_metrics = metrics.MetricLogger.get_available_metrics() + all_metrics = [metric for func in available_metrics for metric in available_metrics[func]] + + metric_choices_block = ('\n').join('{:>35}{}'.format('', metric) for metric in all_metrics) + + metric_alarm_description = (""" +StreamAlertCLI v{} +Add a CloudWatch alarm for predefined metrics. These are save in the config and +Terraform is used to create the alarms. + +Required Arguments: + + -m/--metric The predefined metric to assign this alarm to. Choices are: +{} + -mt/--metric-target The target of this metric alarm, meaning either the cluster metric + or the aggrea metric. Choices are: + cluster + aggregate + all + -co/--comparison-operator Comparison operator to use for this metric. Choices are: + GreaterThanOrEqualToThreshold + GreaterThanThreshold + LessThanThreshold + LessThanOrEqualToThreshold + -an/--alarm-name The name for the alarm. This name must be unique within the AWS account + -ep/--evaluation-periods The number of periods over which data is compared to the specified + threshold. The minimum value for this is 1. Also see the 'Other + Constraints' section below + -p/--period The period, in seconds, over which the specified statistic is applied. + Valid values are any multiple of 60. Also see the 'Other Constraints' + section below + -t/--threshold The value against which the specified statistic is compared. This value + should be a double. + +Optional Arguments: + + -ad/--alarm-description The description for the alarm + -c/--clusters Space delimited list of clusters to apply this metric to. This is + ignored if the --metric-target of 'aggregate' is used. + -es/--extended-statistic The percentile statistic for the metric associated with the alarm. + Specify a value between p0.0 and p100. Cannot be used in conjunction + with the --statistic flag. + -s/--statistic The statistic for the metric associated with the alarm, other than + percentile. For percentile statistics, use --extended-statistic. + Cannot be used in conjunction with the --extended-statistic flag + Choices are: + SampleCount + Average + Sum + Minimum + Maximum + --debug Enable Debug logger output + +Other Constraints: + + The product of the value for period multiplied by the value for evaluation periods cannot + exceed 86,400. 86,400 is the number of seconds in one day and an alarm's total current + evaluation period can be no longer than one day. + +Examples: + + manage.py create-alarm \\ + --metric FailedParses \\ + --metric-target cluster \\ + --comparison-operator GreaterThanOrEqualToThreshold \\ + --alarm-name FailedParsesAlarm \\ + --evaluation-periods 1 \\ + --period 300 \\ + --threshold 1.0 \\ + --alarm-description 'Alarm for any failed parses that occur within a 5 minute period in the prod cluster' \\ + --clusters prod \\ + --statistic Sum + +Resources: + + AWS: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricAlarm.html + Terraform: https://www.terraform.io/docs/providers/aws/r/cloudwatch_metric_alarm.html + +""".format(version, metric_choices_block)) + metric_alarm_parser = subparsers.add_parser( + 'create-alarm', + description=metric_alarm_description, + usage=metric_alarm_usage, + formatter_class=RawTextHelpFormatter, + help=ARGPARSE_SUPPRESS + ) + + # Set the name of this parser to 'validate-schemas' + metric_alarm_parser.set_defaults(command='create-alarm') + + # add all the required parameters + # add metrics for user to pick from. Will be mapped to 'metric_name' in terraform + metric_alarm_parser.add_argument( + '-m', '--metric', + choices=all_metrics, + help=ARGPARSE_SUPPRESS, + required=True + ) + + # check to see what the user wants to apply this metric to (cluster, aggregate, or both) + metric_alarm_parser.add_argument( + '-mt', '--metric-target', + choices=['cluster', 'aggregate', 'all'], + help=ARGPARSE_SUPPRESS, + required=True + ) + + # get the comparison type for this metric + metric_alarm_parser.add_argument( + '-co', '--comparison-operator', + choices=['GreaterThanOrEqualToThreshold', 'GreaterThanThreshold', + 'LessThanThreshold', 'LessThanOrEqualToThreshold'], + help=ARGPARSE_SUPPRESS, + required=True + ) + + # get the name of the alarm + def _alarm_name_validator(val): + if not 1 <= len(val) <= 255: + raise metric_alarm_parser.error('alarm name length must be between 1 and 255') + return val + + metric_alarm_parser.add_argument( + '-an', '--alarm-name', + help=ARGPARSE_SUPPRESS, + required=True, + type=_alarm_name_validator + ) + + # get the evaluation period for this alarm + def _alarm_eval_periods_validator(val): + error = 'evaluation periods must be an integer greater than 0' + try: + period = int(val) + except ValueError: + raise metric_alarm_parser.error(error) + + if period <= 0: + raise metric_alarm_parser.error(error) + return period + + metric_alarm_parser.add_argument( + '-ep', '--evaluation-periods', + help=ARGPARSE_SUPPRESS, + required=True, + type=_alarm_eval_periods_validator + ) + + # get the period for this alarm + def _alarm_period_validator(val): + error = 'period must be an integer in multiples of 60' + try: + period = int(val) + except ValueError: + raise metric_alarm_parser.error(error) + + if period <= 0 or period % 60 != 0: + raise metric_alarm_parser.error(error) + + return period + + metric_alarm_parser.add_argument( + '-p', '--period', + help=ARGPARSE_SUPPRESS, + required=True, + type=_alarm_period_validator + ) + + # get the threshold for this alarm + metric_alarm_parser.add_argument( + '-t', '--threshold', + help=ARGPARSE_SUPPRESS, + required=True, + type=float + ) + + # all other optional flags + # get the optional alarm description + def _alarm_description_validator(val): + if len(val) > 1024: + raise metric_alarm_parser.error('alarm description length must be less than 1024') + return val + + metric_alarm_parser.add_argument( + '-ad', '--alarm-description', + help=ARGPARSE_SUPPRESS, + type=_alarm_description_validator + ) + + # get cluster choices from available files + clusters = [os.path.splitext(cluster)[0] for _, _, files + in os.walk('conf/clusters') for cluster in files] + + # allow the user to select 0 or more clusters to apply this alarm to + metric_alarm_parser.add_argument( + '-c', '--clusters', + choices=clusters, + help=ARGPARSE_SUPPRESS, + nargs='+', + default=[] + ) + + # get the extended statistic or statistic value + statistic_group = metric_alarm_parser.add_mutually_exclusive_group() + def _extended_stat_validator(val): + if not re.search(r'p(\d{1,2}(\.\d{0,2})?|100)$', val): + raise metric_alarm_parser.error('extended statistic values must start with \'p\' ' + 'and be followed by a percentage value (ie: p0.0, ' + 'p10, p55.5, p100)') + return val + + statistic_group.add_argument( + '-es', '--extended-statistic', + help=ARGPARSE_SUPPRESS, + type=_extended_stat_validator + ) + + statistic_group.add_argument( + '-s', '--statistic', + choices=['SampleCount', 'Average', 'Sum', 'Minimum', 'Maximum'], + help=ARGPARSE_SUPPRESS + ) + + # allow verbose output for the CLI with the --debug option + metric_alarm_parser.add_argument( + '--debug', + action='store_true', + help=ARGPARSE_SUPPRESS + ) + + + def _add_lambda_subparser(subparsers): """Add the Lambda subparser: manage.py lambda [subcommand] [options]""" lambda_usage = 'manage.py lambda [subcommand] [options]' @@ -478,6 +717,7 @@ def build_parser(): _add_output_subparser(subparsers) _add_live_test_subparser(subparsers) _add_validate_schema_subparser(subparsers) + _add_metric_alarm_subparser(subparsers) _add_lambda_subparser(subparsers) _add_terraform_subparser(subparsers) _add_configure_subparser(subparsers) From 451694bdcfced918ba52009eda4f5869bb81472d Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 1 Sep 2017 17:05:50 -0700 Subject: [PATCH 03/15] [cli] writing cloudwatch alarm info to global config --- manage.py | 17 +++++++++-------- stream_alert_cli/config.py | 34 ++++++++++++++++++++++++++++++++++ stream_alert_cli/runner.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/manage.py b/manage.py index f82bfc748..acb37fed9 100755 --- a/manage.py +++ b/manage.py @@ -221,15 +221,16 @@ def _add_metric_alarm_subparser(subparsers): GreaterThanThreshold LessThanThreshold LessThanOrEqualToThreshold - -an/--alarm-name The name for the alarm. This name must be unique within the AWS account + -an/--alarm-name The name for the alarm. This name must be unique within the AWS + account -ep/--evaluation-periods The number of periods over which data is compared to the specified threshold. The minimum value for this is 1. Also see the 'Other Constraints' section below - -p/--period The period, in seconds, over which the specified statistic is applied. - Valid values are any multiple of 60. Also see the 'Other Constraints' - section below - -t/--threshold The value against which the specified statistic is compared. This value - should be a double. + -p/--period The period, in seconds, over which the specified statistic is + applied. Valid values are any multiple of 60. Also see the + 'Other Constraints' section below + -t/--threshold The value against which the specified statistic is compared. This + value should be a double. Optional Arguments: @@ -237,8 +238,8 @@ def _add_metric_alarm_subparser(subparsers): -c/--clusters Space delimited list of clusters to apply this metric to. This is ignored if the --metric-target of 'aggregate' is used. -es/--extended-statistic The percentile statistic for the metric associated with the alarm. - Specify a value between p0.0 and p100. Cannot be used in conjunction - with the --statistic flag. + Specify a value between p0.0 and p100. Cannot be used in + conjunction with the --statistic flag. -s/--statistic The statistic for the metric associated with the alarm, other than percentile. For percentile statistics, use --extended-statistic. Cannot be used in conjunction with the --extended-statistic flag diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index 631dc9e7e..8e7a2518b 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -39,6 +39,7 @@ def __getitem__(self, key): def __setitem__(self, key, new_value): self.config.__setitem__(key, new_value) + print 'setting', key self.write() def get(self, key): @@ -126,6 +127,39 @@ def set_aws_account_id(self, aws_account_id): LOGGER_CLI.info('AWS Account ID successfully configured') + def add_metric_alarm(self, alarm_info): + """Add a metric alarm that corresponds to a predefined metrics""" + metrics = self.config['global']['infrastructure'].get('metrics') + if not metrics: + self.config['global']['infrastructure']['metrics'] = {} + + # Check to see if metrics are enabled. If they are not, then alarms are useless. + enable_metrics = self.config['global']['infrastructure']['metrics'].get('enabled', False) + + if not enable_metrics: + LOGGER_CLI.error('Metrics are not currently enabled in \'conf/global.json\'. ' + 'Metrics must be enabled to create alarms.') + return + + current_alarms = self.config['global']['infrastructure']['metrics'].get('alarms', {}) + + if alarm_info['alarm_name'] in current_alarms: + LOGGER_CLI.error('Alarm name \'%s\'already defined. Please remove the previous alarm ' + 'from \'conf/global.json\' or pick a different name.', + alarm_info['alarm_name']) + return + + omitted_keys = {'debug', 'alarm_name', 'command'} + + current_alarms[alarm_info['alarm_name']] = { + key: value for key, value in alarm_info.iteritems() + if key not in omitted_keys and value is not None + } + + # Add the alarms to the config + self.config['global']['infrastructure']['metrics']['alarms'] = current_alarms + self.write() + def load(self): """Load the cluster, global, and lambda configuration files diff --git a/stream_alert_cli/runner.py b/stream_alert_cli/runner.py index 75c6b2d8f..5cd1c0726 100644 --- a/stream_alert_cli/runner.py +++ b/stream_alert_cli/runner.py @@ -73,6 +73,9 @@ def cli_runner(options): elif options.command == 'athena': athena_handler(options) + elif options.command == 'create-alarm': + _create_alarm(options) + def athena_handler(options): """Handle Athena operations""" @@ -649,3 +652,30 @@ def configure_output(options): 'output configuration for service \'%s\'', props['descriptor'].value, options.service) + +def _create_alarm(options): + """Create a new CloudWatch alarm for the given metric + + Args: + options (argparser): Contains all of the necessary info for configuring + a CloudWatch alarm + """ + # Perform safety check for max total evaluation period. This logic cannot + # be performed by argparse so must be performed now. + seconds_in_day = 86400 + if options.period * options.evaluation_periods > seconds_in_day: + LOGGER_CLI.error('The product of the value for period multiplied by the ' + 'value for evaluation periods cannot exceed 86,400. 86,400 ' + 'is the number of seconds in one day and an alarm\'s total ' + 'current evaluation period can be no longer than one day.') + return + + # Check to see if the user is specifying clusters when trying to create an + # alarm on an aggregate metric. Aggregate metrics encompass all clusters so + # specification of clusters doesn't have any real effect + if options.metric_target == 'aggregate' and options.clusters: + LOGGER_CLI.error('Specifying clusters when creating an alarm on an aggregate ' + 'metric has no effect. Please remove the -c/--clusters flag.') + return + + CONFIG.add_metric_alarm(vars(options)) From 3504c50552d0638367cf51a5a4bd57dae6b74050 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Tue, 5 Sep 2017 12:06:48 -0700 Subject: [PATCH 04/15] [cli] adding logic to allow users to overwrite existing alarms if desired --- stream_alert_cli/config.py | 17 ++++++++++------- stream_alert_cli/helpers.py | 11 +++++++++++ stream_alert_cli/runner.py | 14 +++----------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index 8e7a2518b..cde9a12db 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -19,6 +19,7 @@ import re import sys +from stream_alert_cli.helpers import continue_prompt from stream_alert_cli.logger import LOGGER_CLI @@ -137,17 +138,19 @@ def add_metric_alarm(self, alarm_info): enable_metrics = self.config['global']['infrastructure']['metrics'].get('enabled', False) if not enable_metrics: - LOGGER_CLI.error('Metrics are not currently enabled in \'conf/global.json\'. ' - 'Metrics must be enabled to create alarms.') - return + prompt = ('Metrics are not currently enabled in \'conf/global.json\'. Creating a ' + 'metric alarm will have no effect until metrics are enabled. ' + 'Would you like to continue anyway?') + if not continue_prompt(prompt): + return current_alarms = self.config['global']['infrastructure']['metrics'].get('alarms', {}) if alarm_info['alarm_name'] in current_alarms: - LOGGER_CLI.error('Alarm name \'%s\'already defined. Please remove the previous alarm ' - 'from \'conf/global.json\' or pick a different name.', - alarm_info['alarm_name']) - return + prompt = ('Alarm name \'{}\' already defined. Would you ' + 'like to overwrite?').format(alarm_info['alarm_name']) + if not continue_prompt(prompt): + return omitted_keys = {'debug', 'alarm_name', 'command'} diff --git a/stream_alert_cli/helpers.py b/stream_alert_cli/helpers.py index 6229dff51..b4fa817c0 100644 --- a/stream_alert_cli/helpers.py +++ b/stream_alert_cli/helpers.py @@ -67,6 +67,17 @@ def run_command(runner_args, **kwargs): return True +def continue_prompt(prompt=''): + """Continue prompt used to check user's response""" + required_responses = {'yes', 'no'} + response = '' + while response not in required_responses: + prompt = prompt or 'Would you like to continue?' + response = raw_input('\n{} (yes or no): '.format(prompt)) + + return response == 'yes' + + def format_lambda_test_record(test_record): """Create a properly formatted Kinesis, S3, or SNS record. diff --git a/stream_alert_cli/runner.py b/stream_alert_cli/runner.py index 5cd1c0726..59a4fa1e4 100644 --- a/stream_alert_cli/runner.py +++ b/stream_alert_cli/runner.py @@ -25,6 +25,7 @@ from stream_alert_cli import helpers from stream_alert_cli.config import CLIConfig +from stream_alert_cli.helpers import continue_prompt from stream_alert_cli.logger import LOGGER_CLI import stream_alert_cli.outputs as config_outputs from stream_alert_cli.package import AlertProcessorPackage, AthenaPackage, RuleProcessorPackage @@ -318,16 +319,6 @@ def run_command(args=None, **kwargs): return helpers.run_command(args, **kwargs) -def continue_prompt(): - """Continue prompt used before applying Terraform plans""" - required_responses = {'yes', 'no'} - response = '' - while response not in required_responses: - response = raw_input('\nWould you like to continue? (yes or no): ') - if response == 'no': - sys.exit(0) - - def tf_runner(**kwargs): """Terraform wrapper to build StreamAlert infrastructure. @@ -363,7 +354,8 @@ def tf_runner(**kwargs): if not run_command(tf_command): return False - continue_prompt() + if not continue_prompt(): + sys.exit(0) if action == 'destroy': LOGGER_CLI.info('Destroying infrastructure') From a9f97844685d578fe8b8e435473ee593da106234 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Tue, 5 Sep 2017 14:24:27 -0700 Subject: [PATCH 05/15] [cli] adding ability to toggle metrics via the cli --- manage.py | 56 ++++++++++++++++++++++++++++++++++++++ stream_alert_cli/config.py | 10 +++++++ stream_alert_cli/runner.py | 13 +++++++++ 3 files changed, 79 insertions(+) diff --git a/manage.py b/manage.py index acb37fed9..baa7525a4 100755 --- a/manage.py +++ b/manage.py @@ -192,6 +192,60 @@ def _add_validate_schema_subparser(subparsers): ) +def _add_metrics_subparser(subparsers): + """Add the metrics subparser: manage.py metrics [options]""" + metrics_usage = 'manage.py metrics [options]' + + metrics_description = (""" +StreamAlertCLI v{} +Enable or disable metrics for all lambda functions. This toggles the creation of metric filters. + +Available Options: + + -e/--enable Enable CloudWatch metrics through logging and metric filters + -d/--disable Disable CloudWatch metrics through logging and metric filters + --debug Enable Debug logger output + +Examples: + + manage.py metrics --enable + +""".format(version)) + + metrics_parser = subparsers.add_parser( + 'metrics', + description=metrics_description, + usage=metrics_usage, + formatter_class=RawTextHelpFormatter, + help=ARGPARSE_SUPPRESS + ) + + # Set the name of this parser to 'metrics' + metrics_parser.set_defaults(command='metrics') + + # get the metric toggle value + toggle_group = metrics_parser.add_mutually_exclusive_group(required=True) + + toggle_group.add_argument( + '-e', '--enable', + dest='enable_metrics', + action='store_true' + ) + + toggle_group.add_argument( + '-d', '--disable', + dest='enable_metrics', + action='store_false' + ) + + # allow verbose output for the CLI with the --debug option + metrics_parser.add_argument( + '--debug', + action='store_true', + help=ARGPARSE_SUPPRESS + ) + + def _add_metric_alarm_subparser(subparsers): """Add the create-alarm subparser: manage.py create-alarm [options]""" metric_alarm_usage = 'manage.py create-alarm [options]' @@ -277,6 +331,7 @@ def _add_metric_alarm_subparser(subparsers): Terraform: https://www.terraform.io/docs/providers/aws/r/cloudwatch_metric_alarm.html """.format(version, metric_choices_block)) + metric_alarm_parser = subparsers.add_parser( 'create-alarm', description=metric_alarm_description, @@ -718,6 +773,7 @@ def build_parser(): _add_output_subparser(subparsers) _add_live_test_subparser(subparsers) _add_validate_schema_subparser(subparsers) + _add_metrics_subparser(subparsers) _add_metric_alarm_subparser(subparsers) _add_lambda_subparser(subparsers) _add_terraform_subparser(subparsers) diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index cde9a12db..7142e17b6 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -128,6 +128,16 @@ def set_aws_account_id(self, aws_account_id): LOGGER_CLI.info('AWS Account ID successfully configured') + def toggle_metrics(self, enabled): + """Toggle CloudWatch metric logging and filter creation""" + metrics = self.config['global']['infrastructure'].get('metrics') + if not metrics: + self.config['global']['infrastructure']['metrics'] = {} + + metrics['enabled'] = enabled + + self.write() + def add_metric_alarm(self, alarm_info): """Add a metric alarm that corresponds to a predefined metrics""" metrics = self.config['global']['infrastructure'].get('metrics') diff --git a/stream_alert_cli/runner.py b/stream_alert_cli/runner.py index 59a4fa1e4..1a81b5170 100644 --- a/stream_alert_cli/runner.py +++ b/stream_alert_cli/runner.py @@ -74,6 +74,9 @@ def cli_runner(options): elif options.command == 'athena': athena_handler(options) + elif options.command == 'metrics': + _toggle_metrics(options) + elif options.command == 'create-alarm': _create_alarm(options) @@ -645,6 +648,16 @@ def configure_output(options): props['descriptor'].value, options.service) + +def _toggle_metrics(options): + """Enable or disable logging CloudWatch metrics + + Args: + options (argparser): Contains boolean necessary for toggling metrics + """ + CONFIG.toggle_metrics(options.enable_metrics) + + def _create_alarm(options): """Create a new CloudWatch alarm for the given metric From 0f0473033aa0769056246d9f6082c9de349a9cb0 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 6 Sep 2017 17:17:50 -0700 Subject: [PATCH 06/15] [metrics] making metrics configurable per function/cluster and not a global option --- conf/clusters/prod.json | 6 ++++-- conf/global.json | 3 --- docs/source/athena-deploy.rst | 2 ++ stream_alert_cli/terraform_generate.py | 30 ++++++++++++++------------ 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/conf/clusters/prod.json b/conf/clusters/prod.json index 2b66c9469..22ea232a3 100644 --- a/conf/clusters/prod.json +++ b/conf/clusters/prod.json @@ -25,13 +25,15 @@ "current_version": "$LATEST", "log_level": "info", "memory": 128, - "timeout": 10 + "timeout": 10, + "enable_metrics": true }, "rule_processor": { "current_version": "$LATEST", "log_level": "info", "memory": 128, - "timeout": 10 + "timeout": 30, + "enable_metrics": true } } }, diff --git a/conf/global.json b/conf/global.json index b7b743d4a..79952fda2 100644 --- a/conf/global.json +++ b/conf/global.json @@ -6,9 +6,6 @@ "region": "us-east-1" }, "infrastructure": { - "metrics": { - "enabled": true - }, "monitoring": { "create_sns_topic": true } diff --git a/docs/source/athena-deploy.rst b/docs/source/athena-deploy.rst index 999b766fc..0213ad529 100644 --- a/docs/source/athena-deploy.rst +++ b/docs/source/athena-deploy.rst @@ -55,6 +55,7 @@ Open ``conf/lambda.json``, and fill in the following ``Required`` options: Key Required Default Description ----------------------------------- -------- -------------------- ----------- ``enabled`` ``Yes`` ``true`` Enables/Disables the Athena Partition Refresh Lambda function +``enable_metrics`` ``No`` ``false`` Enables/Disables logging of metrics for the Athena Partition Refresh Lambda function ``log_level`` ``No`` ``info`` The log level for the Lambda function, can be either ``info`` or ``debug``. Debug will help with diagnosing errors with polling SQS or sending Athena queries. ``memory`` ``No`` ``128`` The amount of memory (in MB) allocated to the Lambda function ``timeout`` ``No`` ``60`` The maximum duration of the Lambda function (in seconds) @@ -70,6 +71,7 @@ Key Required Default Descriptio { "athena_partition_refresh_config": { "enabled": true, + "enable_metrics": false, "log_level": "info", "memory": 128, "refresh_type": { diff --git a/stream_alert_cli/terraform_generate.py b/stream_alert_cli/terraform_generate.py index f37a63698..1c1708c95 100644 --- a/stream_alert_cli/terraform_generate.py +++ b/stream_alert_cli/terraform_generate.py @@ -216,8 +216,6 @@ def generate_stream_alert(cluster_name, cluster_dict, config): Returns: bool: Result of applying the stream_alert module """ - enable_metrics = config['global'].get('infrastructure', - {}).get('metrics', {}).get('enabled', False) account = config['global']['account'] modules = config['clusters'][cluster_name]['modules'] @@ -228,7 +226,8 @@ def generate_stream_alert(cluster_name, cluster_dict, config): 'prefix': account['prefix'], 'cluster': cluster_name, 'kms_key_arn': '${aws_kms_key.stream_alert_secrets.arn}', - 'rule_processor_enable_metrics': enable_metrics, + 'rule_processor_enable_metrics': modules['stream_alert'] \ + ['rule_processor']['enable_metrics'], 'rule_processor_log_level': modules['stream_alert'] \ ['rule_processor'].get('log_level', 'info'), 'rule_processor_memory': modules['stream_alert']['rule_processor']['memory'], @@ -236,7 +235,8 @@ def generate_stream_alert(cluster_name, cluster_dict, config): 'rule_processor_version': modules['stream_alert']['rule_processor']['current_version'], 'rule_processor_config': '${var.rule_processor_config}', 'alert_processor_config': '${var.alert_processor_config}', - 'alert_processor_enable_metrics': enable_metrics, + 'alert_processor_enable_metrics': modules['stream_alert'] \ + ['alert_processor']['enable_metrics'], 'alert_processor_log_level': modules['stream_alert'] \ ['alert_processor'].get('log_level', 'info'), 'alert_processor_memory': modules['stream_alert']['alert_processor']['memory'], @@ -292,12 +292,7 @@ def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): Returns: bool: Result of applying the cloudwatch metric filters to the stream_alert module """ - enable_metrics = config['global'].get('infrastructure', - {}).get('metrics', {}).get('enabled', False) - - # Do not add any metric filters if metrics are disabled - if not enable_metrics: - return + stream_alert_config = config['clusters'][cluster_name]['modules']['stream_alert'] current_metrics = metrics.MetricLogger.get_available_metrics() @@ -310,6 +305,15 @@ def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): if func not in current_metrics: continue + if func not in stream_alert_config: + LOGGER_CLI.error('Function for metrics \'%s\' is not defined in stream alert config. ' + 'Options are: %s', func, + ', '.join('\'{}\''.format(key) for key in stream_alert_config)) + continue + + if not stream_alert_config[func].get('enable_metrics'): + continue + metric_prefix = funcs[func] filter_pattern_idx, filter_value_idx = 0, 1 @@ -661,8 +665,6 @@ def generate_athena(config): """ athena_dict = infinitedict() athena_config = config['lambda']['athena_partition_refresh_config'] - enable_metrics = config['global'].get('infrastructure', - {}).get('metrics', {}).get('enabled', False) data_buckets = set() for refresh_type in athena_config['refresh_type']: @@ -679,11 +681,11 @@ def generate_athena(config): 'athena_data_buckets': list(data_buckets), 'refresh_interval': athena_config.get('refresh_interval', 'rate(10 minutes)'), 'current_version': athena_config['current_version'], - 'enable_metrics': enable_metrics, + 'enable_metrics': athena_config.get('enable_metrics', False), 'prefix': config['global']['account']['prefix'] } - if not enable_metrics: + if not athena_config.get('enable_metrics', False): return athena_dict # Check to see if there are any metrics configured for the athena function From 860190c7e0afbfa8f68bd4d8f30167e5a8b8aa5f Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 6 Sep 2017 17:21:01 -0700 Subject: [PATCH 07/15] [cli] simplifying some monitoring code --- stream_alert_cli/terraform_generate.py | 31 ++++++++++++-------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/stream_alert_cli/terraform_generate.py b/stream_alert_cli/terraform_generate.py index 1c1708c95..bfd269e8d 100644 --- a/stream_alert_cli/terraform_generate.py +++ b/stream_alert_cli/terraform_generate.py @@ -20,6 +20,7 @@ from stream_alert.shared import metrics from stream_alert_cli.logger import LOGGER_CLI +DEFAULT_SNS_MONITORING_TOPIC = 'stream_alert_monitoring' RESTRICTED_CLUSTER_NAMES = ('main', 'athena') @@ -160,8 +161,8 @@ def generate_main(**kwargs): infrastructure_config = config['global'].get('infrastructure') if infrastructure_config and 'monitoring' in infrastructure_config: if infrastructure_config['monitoring'].get('create_sns_topic'): - main_dict['resource']['aws_sns_topic']['stream_alert_monitoring'] = { - 'name': 'stream_alert_monitoring' + main_dict['resource']['aws_sns_topic'][DEFAULT_SNS_MONITORING_TOPIC] = { + 'name': DEFAULT_SNS_MONITORING_TOPIC } return main_dict @@ -350,25 +351,21 @@ def generate_cloudwatch_monitoring(cluster_name, cluster_dict, config): """ prefix = config['global']['account']['prefix'] infrastructure_config = config['global'].get('infrastructure') - sns_topic_arn = None - if infrastructure_config and 'monitoring' in infrastructure_config: - if infrastructure_config['monitoring'].get('create_sns_topic'): - sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( - region=config['global']['account']['region'], - account_id=config['global']['account']['aws_account_id'], - topic='stream_alert_monitoring' - ) - elif infrastructure_config['monitoring'].get('sns_topic_name'): - sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( - region=config['global']['account']['region'], - account_id=config['global']['account']['aws_account_id'], - topic=infrastructure_config['monitoring']['sns_topic_name'] - ) - else: + if not (infrastructure_config and 'monitoring' in infrastructure_config): LOGGER_CLI.error('Invalid config: Make sure you declare global infrastructure options!') return False + topic_name = DEFAULT_SNS_MONITORING_TOPIC if infrastructure_config \ + ['monitoring'].get('create_sns_topic') else \ + infrastructure_config['monitoring'].get('sns_topic_name') + + sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( + region=config['global']['account']['region'], + account_id=config['global']['account']['aws_account_id'], + topic=topic_name + ) + lambda_functions = [ '{}_{}_streamalert_rule_processor'.format(prefix, cluster_name), '{}_{}_streamalert_alert_processor'.format(prefix, cluster_name) From 38175cb5fa551bef2570cd7c70621f2194dea60f Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 6 Sep 2017 18:47:08 -0700 Subject: [PATCH 08/15] [cli] updating cli to support toggling metrics per cluster/function * Adding some flags to the `manage.py metrics` command to accept a cluster and a function name * By default, function name is required while cluster is not (will default to all clusters) --- conf/clusters/prod.json | 7 ++--- manage.py | 62 +++++++++++++++++++++++++++++++++----- stream_alert_cli/config.py | 27 +++++++++++++---- stream_alert_cli/runner.py | 2 +- 4 files changed, 80 insertions(+), 18 deletions(-) diff --git a/conf/clusters/prod.json b/conf/clusters/prod.json index 22ea232a3..d8bf99796 100644 --- a/conf/clusters/prod.json +++ b/conf/clusters/prod.json @@ -25,15 +25,14 @@ "current_version": "$LATEST", "log_level": "info", "memory": 128, - "timeout": 10, - "enable_metrics": true + "timeout": 10 }, "rule_processor": { "current_version": "$LATEST", + "enable_metrics": true, "log_level": "info", "memory": 128, - "timeout": 30, - "enable_metrics": true + "timeout": 30 } } }, diff --git a/manage.py b/manage.py index baa7525a4..3d70f136e 100755 --- a/manage.py +++ b/manage.py @@ -23,7 +23,7 @@ terraform -var-file=../terraform.tfvars -var-file=../variables.json """ -from argparse import ArgumentParser, RawTextHelpFormatter, SUPPRESS as ARGPARSE_SUPPRESS +from argparse import Action, ArgumentParser, RawTextHelpFormatter, SUPPRESS as ARGPARSE_SUPPRESS import os import re @@ -33,6 +33,13 @@ from stream_alert_cli.runner import cli_runner +class UniqueSetAction(Action): + """Subclass of argparse.Action to avoid multiple of the same choice from a list""" + def __call__(self, parser, namespace, values, option_string=None): + unique_items = set(values) + setattr(namespace, self.dest, unique_items) + + def _add_output_subparser(subparsers): """Add the output subparser: manage.py output [subcommand] [options]""" output_usage = 'manage.py output [subcommand] [options]' @@ -196,6 +203,12 @@ def _add_metrics_subparser(subparsers): """Add the metrics subparser: manage.py metrics [options]""" metrics_usage = 'manage.py metrics [options]' + # get cluster choices from available files + clusters = [os.path.splitext(cluster)[0] for _, _, files + in os.walk('conf/clusters') for cluster in files] + + cluster_choices_block = ('\n').join('{:>28}{}'.format('', cluster) for cluster in clusters) + metrics_description = (""" StreamAlertCLI v{} Enable or disable metrics for all lambda functions. This toggles the creation of metric filters. @@ -204,13 +217,23 @@ def _add_metrics_subparser(subparsers): -e/--enable Enable CloudWatch metrics through logging and metric filters -d/--disable Disable CloudWatch metrics through logging and metric filters + -f/--functions Space delimited list of functions to enable metrics for + Choices are: + rule + alert (not implemented) + athena (not implemented) --debug Enable Debug logger output +Optional Arguemnts: + + -c/--clusters Space delimited list of clusters to enable metrics for. If + omitted, this will enable metrics for all clusters. Choices are: +{} Examples: manage.py metrics --enable -""".format(version)) +""".format(version, cluster_choices_block)) metrics_parser = subparsers.add_parser( 'metrics', @@ -223,6 +246,16 @@ def _add_metrics_subparser(subparsers): # Set the name of this parser to 'metrics' metrics_parser.set_defaults(command='metrics') + # allow the user to select 1 or more functions to enable metrics for + metrics_parser.add_argument( + '-f', '--functions', + choices=['rule', 'alert', 'athena'], + help=ARGPARSE_SUPPRESS, + nargs='+', + action=UniqueSetAction, + required=True + ) + # get the metric toggle value toggle_group = metrics_parser.add_mutually_exclusive_group(required=True) @@ -238,6 +271,16 @@ def _add_metrics_subparser(subparsers): action='store_false' ) + # allow the user to select 0 or more clusters to enable metrics for + metrics_parser.add_argument( + '-c', '--clusters', + choices=clusters, + help=ARGPARSE_SUPPRESS, + nargs='+', + action=UniqueSetAction, + default=clusters + ) + # allow verbose output for the CLI with the --debug option metrics_parser.add_argument( '--debug', @@ -256,6 +299,12 @@ def _add_metric_alarm_subparser(subparsers): metric_choices_block = ('\n').join('{:>35}{}'.format('', metric) for metric in all_metrics) + # get cluster choices from available files + clusters = [os.path.splitext(cluster)[0] for _, _, files + in os.walk('conf/clusters') for cluster in files] + + cluster_choices_block = ('\n').join('{:>37}{}'.format('', cluster) for cluster in clusters) + metric_alarm_description = (""" StreamAlertCLI v{} Add a CloudWatch alarm for predefined metrics. These are save in the config and @@ -291,6 +340,8 @@ def _add_metric_alarm_subparser(subparsers): -ad/--alarm-description The description for the alarm -c/--clusters Space delimited list of clusters to apply this metric to. This is ignored if the --metric-target of 'aggregate' is used. + Choices are: +{} -es/--extended-statistic The percentile statistic for the metric associated with the alarm. Specify a value between p0.0 and p100. Cannot be used in conjunction with the --statistic flag. @@ -330,7 +381,7 @@ def _add_metric_alarm_subparser(subparsers): AWS: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricAlarm.html Terraform: https://www.terraform.io/docs/providers/aws/r/cloudwatch_metric_alarm.html -""".format(version, metric_choices_block)) +""".format(version, metric_choices_block, cluster_choices_block)) metric_alarm_parser = subparsers.add_parser( 'create-alarm', @@ -442,16 +493,13 @@ def _alarm_description_validator(val): type=_alarm_description_validator ) - # get cluster choices from available files - clusters = [os.path.splitext(cluster)[0] for _, _, files - in os.walk('conf/clusters') for cluster in files] - # allow the user to select 0 or more clusters to apply this alarm to metric_alarm_parser.add_argument( '-c', '--clusters', choices=clusters, help=ARGPARSE_SUPPRESS, nargs='+', + action=UniqueSetAction, default=[] ) diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index 7142e17b6..e67c59a46 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -19,6 +19,7 @@ import re import sys +from stream_alert.shared import metrics from stream_alert_cli.helpers import continue_prompt from stream_alert_cli.logger import LOGGER_CLI @@ -128,13 +129,27 @@ def set_aws_account_id(self, aws_account_id): LOGGER_CLI.info('AWS Account ID successfully configured') - def toggle_metrics(self, enabled): - """Toggle CloudWatch metric logging and filter creation""" - metrics = self.config['global']['infrastructure'].get('metrics') - if not metrics: - self.config['global']['infrastructure']['metrics'] = {} + def toggle_metrics(self, enabled, clusters, lambda_functions): + """Toggle CloudWatch metric logging and filter creation - metrics['enabled'] = enabled + Args: + enabled (bool): False if disabling metrics, true if enable_logging + clusters (list): Clusters to enable or disable metrics on + lambda_functions (list): Which lambda functions to enable or disable + metrics on (rule, alert, or athena) + """ + for function in lambda_functions: + if function == 'athena': + if 'athena_partition_refresh_config' in self.config['lambda']: + self.config['lambda']['athena_partition_refresh_config'] \ + ['enable_metrics'] = enabled + else: + LOGGER_CLI.error('No Athena configuration found; please initialize first.') + continue + + for cluster in clusters: + self.config['clusters'][cluster]['modules']['stream_alert'] \ + ['{}_processor'.format(function)]['enable_metrics'] = enabled self.write() diff --git a/stream_alert_cli/runner.py b/stream_alert_cli/runner.py index 1a81b5170..235f264db 100644 --- a/stream_alert_cli/runner.py +++ b/stream_alert_cli/runner.py @@ -655,7 +655,7 @@ def _toggle_metrics(options): Args: options (argparser): Contains boolean necessary for toggling metrics """ - CONFIG.toggle_metrics(options.enable_metrics) + CONFIG.toggle_metrics(options.enable_metrics, options.clusters, options.functions) def _create_alarm(options): From 1857628817aa45c24bf395f500da6c83b3a42468 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 7 Sep 2017 15:07:28 -0700 Subject: [PATCH 09/15] [cli] adding class to normalize chosen functions to constant names --- manage.py | 28 ++++++++++++++++++++++++++-- stream_alert_cli/config.py | 10 ++++++---- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/manage.py b/manage.py index 3d70f136e..aa1acc527 100755 --- a/manage.py +++ b/manage.py @@ -27,7 +27,12 @@ import os import re -from stream_alert.shared import metrics +from stream_alert.shared import ( + ALERT_PROCESSOR_NAME, + ATHENA_PARTITION_REFRESH_NAME, + metrics, + RULE_PROCESSOR_NAME +) from stream_alert_cli import __version__ as version from stream_alert_cli.logger import LOGGER_CLI from stream_alert_cli.runner import cli_runner @@ -40,6 +45,25 @@ def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, unique_items) +class NormalizeFunctionAction(UniqueSetAction): + """Subclass of argparse.Action -> UniqueSetAction that will return a unique set of + normalized lambda function names. + """ + def __call__(self, parser, namespace, values, option_string=None): + super(NormalizeFunctionAction, self).__call__(parser, namespace, values, option_string) + values = getattr(namespace, self.dest) + normalized_map = {'rule': RULE_PROCESSOR_NAME, + 'alert': ALERT_PROCESSOR_NAME, + 'athena': ATHENA_PARTITION_REFRESH_NAME} + + for func, normalize_func in normalized_map.iteritems(): + if func in values: + values.remove(func) + values.add(normalize_func) + + setattr(namespace, self.dest, values) + + def _add_output_subparser(subparsers): """Add the output subparser: manage.py output [subcommand] [options]""" output_usage = 'manage.py output [subcommand] [options]' @@ -252,7 +276,7 @@ def _add_metrics_subparser(subparsers): choices=['rule', 'alert', 'athena'], help=ARGPARSE_SUPPRESS, nargs='+', - action=UniqueSetAction, + action=NormalizeFunctionAction, required=True ) diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index e67c59a46..2a768d0a6 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -19,7 +19,10 @@ import re import sys -from stream_alert.shared import metrics +from stream_alert.shared import ( + ATHENA_PARTITION_REFRESH_NAME, + metrics +) from stream_alert_cli.helpers import continue_prompt from stream_alert_cli.logger import LOGGER_CLI @@ -41,7 +44,6 @@ def __getitem__(self, key): def __setitem__(self, key, new_value): self.config.__setitem__(key, new_value) - print 'setting', key self.write() def get(self, key): @@ -139,7 +141,7 @@ def toggle_metrics(self, enabled, clusters, lambda_functions): metrics on (rule, alert, or athena) """ for function in lambda_functions: - if function == 'athena': + if function == ATHENA_PARTITION_REFRESH_NAME: if 'athena_partition_refresh_config' in self.config['lambda']: self.config['lambda']['athena_partition_refresh_config'] \ ['enable_metrics'] = enabled @@ -149,7 +151,7 @@ def toggle_metrics(self, enabled, clusters, lambda_functions): for cluster in clusters: self.config['clusters'][cluster]['modules']['stream_alert'] \ - ['{}_processor'.format(function)]['enable_metrics'] = enabled + [function]['enable_metrics'] = enabled self.write() From 6492c3ff276be6c985e9b1047212f119ea1c29df Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 7 Sep 2017 15:58:39 -0700 Subject: [PATCH 10/15] [cli][conf] updating code for saving metric alarms to respective cluster files * Metric alarms related to aggregate metrics get saved in the conf/global.json file * Metric alarms related to the athena function get saved in the conf/global.json file * Metric alarms related to the rule or alert processor functions get save to each cluster file. This enables us to better organize metrics and allows the user to set different alarms for different clusters. --- manage.py | 13 +-- stream_alert_cli/config.py | 141 ++++++++++++++++++++----- stream_alert_cli/terraform_generate.py | 9 +- 3 files changed, 120 insertions(+), 43 deletions(-) diff --git a/manage.py b/manage.py index aa1acc527..8f2df74b4 100755 --- a/manage.py +++ b/manage.py @@ -27,12 +27,7 @@ import os import re -from stream_alert.shared import ( - ALERT_PROCESSOR_NAME, - ATHENA_PARTITION_REFRESH_NAME, - metrics, - RULE_PROCESSOR_NAME -) +from stream_alert.shared import metrics from stream_alert_cli import __version__ as version from stream_alert_cli.logger import LOGGER_CLI from stream_alert_cli.runner import cli_runner @@ -52,9 +47,9 @@ class NormalizeFunctionAction(UniqueSetAction): def __call__(self, parser, namespace, values, option_string=None): super(NormalizeFunctionAction, self).__call__(parser, namespace, values, option_string) values = getattr(namespace, self.dest) - normalized_map = {'rule': RULE_PROCESSOR_NAME, - 'alert': ALERT_PROCESSOR_NAME, - 'athena': ATHENA_PARTITION_REFRESH_NAME} + normalized_map = {'rule': metrics.RULE_PROCESSOR_NAME, + 'alert': metrics.ALERT_PROCESSOR_NAME, + 'athena': metrics.ATHENA_PARTITION_REFRESH_NAME} for func, normalize_func in normalized_map.iteritems(): if func in values: diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index 2a768d0a6..f646d8564 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -19,10 +19,7 @@ import re import sys -from stream_alert.shared import ( - ATHENA_PARTITION_REFRESH_NAME, - metrics -) +from stream_alert.shared import metrics from stream_alert_cli.helpers import continue_prompt from stream_alert_cli.logger import LOGGER_CLI @@ -141,7 +138,7 @@ def toggle_metrics(self, enabled, clusters, lambda_functions): metrics on (rule, alert, or athena) """ for function in lambda_functions: - if function == ATHENA_PARTITION_REFRESH_NAME: + if function == metrics.THENA_PARTITION_REFRESH_NAME: if 'athena_partition_refresh_config' in self.config['lambda']: self.config['lambda']['athena_partition_refresh_config'] \ ['enable_metrics'] = enabled @@ -155,39 +152,127 @@ def toggle_metrics(self, enabled, clusters, lambda_functions): self.write() + @staticmethod + def _add_metric_alarm_config(alarm_info, config, prompt_detail): + """Helper function to add the metric alarm to the respective config""" + metric_alarms = config.get('metric_alarms', {}) + if alarm_info['alarm_name'] in metric_alarms: + prompt = ('Alarm name \'{}\' already defined {}. Would you like ' + 'to overwrite?'.format(alarm_info['alarm_name'], prompt_detail)) + if not continue_prompt(prompt): + return False + + # Some keys that come from the argparse options can be omitted + omitted_keys = {'debug', 'alarm_name', 'command', 'clusters', 'metric_target'} + + metric_alarms[alarm_info['alarm_name']] = { + key: value for key, value in alarm_info.iteritems() + if key not in omitted_keys and value is not None + } + + config['metric_alarms'] = metric_alarms + + return True + + def _add_metric_alarm_per_cluster(self, alarm_info, function_name): + """Add a metric alarm for individual clusters""" + # If no clusters have been specified by the user, we can assume this alarm + # should be created for all available clusters, so fall back to that + clusters = (alarm_info['clusters'] if alarm_info['clusters'] else + list(self.config['clusters'])) + + # Go over each of the clusters and see if enable_metrics == True and prompt + # the user to toggle metrics on if this is False + for cluster in clusters: + function_config = (self.config['clusters'][cluster]['modules'] + ['stream_alert'][function_name]) + + if not function_config.get('enable_metrics'): + prompt = ('Metrics are not currently enabled for the \'{}\' function ' + 'within the \'{}\' cluster. Would you like to enable metrics ' + 'for this cluster?'.format(function_name, cluster)) + + if continue_prompt(prompt): + self.toggle_metrics(True, [cluster], [function_name]) + + elif not continue_prompt('Would you still like to add this alarm ' + 'even though metrics are disabled?'): + continue + + prompt_context = ('for the \'{}\' function in the \'{}\' ' + 'cluster'.format(function_name, cluster)) + + if self._add_metric_alarm_config(alarm_info, function_config, prompt_context): + LOGGER_CLI.info('Successfully added \'%s\' metric alarm for the \'%s\' ' + 'function to \'conf/clusters/%s.json.\'', + alarm_info['alarm_name'], function_name, cluster) + def add_metric_alarm(self, alarm_info): """Add a metric alarm that corresponds to a predefined metrics""" - metrics = self.config['global']['infrastructure'].get('metrics') - if not metrics: - self.config['global']['infrastructure']['metrics'] = {} - - # Check to see if metrics are enabled. If they are not, then alarms are useless. - enable_metrics = self.config['global']['infrastructure']['metrics'].get('enabled', False) + # Get the current metrics for each function + current_metrics = metrics.MetricLogger.get_available_metrics() + + # Extract the function name this metric is associated with + metric_function = {metric: function for function in current_metrics + for metric in current_metrics[function]}[alarm_info['metric']] + + # Do not continue if the user is trying to apply a metric alarm for an athena + # metric to a specific cluster (since the athena function operates on all clusters) + if (alarm_info['metric_target'] != 'aggregate' and + metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME): + LOGGER_CLI.error('Metrics for the athena function can only be applied ' + 'to an aggregate metric target, not on a per-cluster basis.') + return - if not enable_metrics: - prompt = ('Metrics are not currently enabled in \'conf/global.json\'. Creating a ' - 'metric alarm will have no effect until metrics are enabled. ' - 'Would you like to continue anyway?') + # If the metric is related to either the rule processor or alert processor, we should + # check to see if any cluster has metrics enabled for that function before continuing + if (metric_function in {metrics.ALERT_PROCESSOR_NAME, metrics.RULE_PROCESSOR_NAME} and + not any(self.config['clusters'][cluster]['modules']['stream_alert'] + [metric_function].get('enable_metrics') for cluster in + self.config['clusters'])): + prompt = ('Metrics are not currently enabled for the \'{}\' function ' + 'within any cluster. Creating an alarm will have no effect ' + 'until metrics are enabled for this function in at least one ' + 'cluster. Would you still like to continue?'.format(metric_function)) if not continue_prompt(prompt): return - current_alarms = self.config['global']['infrastructure']['metrics'].get('alarms', {}) - - if alarm_info['alarm_name'] in current_alarms: - prompt = ('Alarm name \'{}\' already defined. Would you ' - 'like to overwrite?').format(alarm_info['alarm_name']) - if not continue_prompt(prompt): + elif metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME: + # If the user is attempting to add a metric for athena, make sure the athena + # function is initialized first + if 'athena_partition_refresh_config' not in self.config['lambda']: + LOGGER_CLI.error('No configuration found for Athena Partition Refresh. ' + 'Please run: `$ python manage.py athena init` first.') return - omitted_keys = {'debug', 'alarm_name', 'command'} + # If the athena function is initialized, but metrics are not enabled, ask + # the user if they would like to enable them now + if not self.config['lambda']['athena_partition_refresh_config'].get('enable_metrics'): + prompt = ('Metrics are not currently enabled for the \'athena\' function. ' + 'Would you like to enable metrics for athena?') - current_alarms[alarm_info['alarm_name']] = { - key: value for key, value in alarm_info.iteritems() - if key not in omitted_keys and value is not None - } + if continue_prompt(prompt): + self.toggle_metrics(True, None, [metric_function]) + + elif not continue_prompt('Would you still like to add this alarm ' + 'even though metrics are disabled?'): + return + + # Add metric alarms for the aggregate metrics - these are added to the global config + if (alarm_info['metric_target'] == 'aggregate' or + metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME): + global_config = self.config['global']['infrastructure']['monitoring'] + + prompt_context = 'in the aggregate alarms within \'conf/globals.json\'' + if self._add_metric_alarm_config(alarm_info, global_config, prompt_context): + LOGGER_CLI.info('Successfully added \'%s\' metric alarm to \'conf/global.json.\'', + alarm_info['alarm_name']) + + else: + # Add metric alarms on a per-cluster basis - these are added to the cluster config + self._add_metric_alarm_per_cluster(alarm_info, metric_function) - # Add the alarms to the config - self.config['global']['infrastructure']['metrics']['alarms'] = current_alarms + # Save all of the alarm updates to disk self.write() def load(self): diff --git a/stream_alert_cli/terraform_generate.py b/stream_alert_cli/terraform_generate.py index bfd269e8d..9ef8dac9d 100644 --- a/stream_alert_cli/terraform_generate.py +++ b/stream_alert_cli/terraform_generate.py @@ -228,7 +228,7 @@ def generate_stream_alert(cluster_name, cluster_dict, config): 'cluster': cluster_name, 'kms_key_arn': '${aws_kms_key.stream_alert_secrets.arn}', 'rule_processor_enable_metrics': modules['stream_alert'] \ - ['rule_processor']['enable_metrics'], + ['rule_processor'].get('enable_metrics', False), 'rule_processor_log_level': modules['stream_alert'] \ ['rule_processor'].get('log_level', 'info'), 'rule_processor_memory': modules['stream_alert']['rule_processor']['memory'], @@ -237,7 +237,7 @@ def generate_stream_alert(cluster_name, cluster_dict, config): 'rule_processor_config': '${var.rule_processor_config}', 'alert_processor_config': '${var.alert_processor_config}', 'alert_processor_enable_metrics': modules['stream_alert'] \ - ['alert_processor']['enable_metrics'], + ['alert_processor'].get('enable_metrics', False), 'alert_processor_log_level': modules['stream_alert'] \ ['alert_processor'].get('log_level', 'info'), 'alert_processor_memory': modules['stream_alert']['alert_processor']['memory'], @@ -283,15 +283,12 @@ def generate_stream_alert(cluster_name, cluster_dict, config): return True def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): - """Add the CloudWatch Metric Filters module to the Terraform cluster dict. + """Add the CloudWatch Metric Filters information to the Terraform cluster dict. Args: cluster_name (str): The name of the currently generating cluster cluster_dict (defaultdict): The dict containing all Terraform config for a given cluster. config (dict): The loaded config from the 'conf/' directory - - Returns: - bool: Result of applying the cloudwatch metric filters to the stream_alert module """ stream_alert_config = config['clusters'][cluster_name]['modules']['stream_alert'] From f805cab0db2b793009d86ff3475c0e5efbf8af9e Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 8 Sep 2017 09:20:16 -0700 Subject: [PATCH 11/15] [cli][metrics] adding hard check to prevent multiple alarms with the same name --- stream_alert_cli/config.py | 57 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index f646d8564..40972b148 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -138,7 +138,7 @@ def toggle_metrics(self, enabled, clusters, lambda_functions): metrics on (rule, alert, or athena) """ for function in lambda_functions: - if function == metrics.THENA_PARTITION_REFRESH_NAME: + if function == metrics.ATHENA_PARTITION_REFRESH_NAME: if 'athena_partition_refresh_config' in self.config['lambda']: self.config['lambda']['athena_partition_refresh_config'] \ ['enable_metrics'] = enabled @@ -207,8 +207,61 @@ def _add_metric_alarm_per_cluster(self, alarm_info, function_name): 'function to \'conf/clusters/%s.json.\'', alarm_info['alarm_name'], function_name, cluster) + def _alarm_exists(self, alarm_name): + """Check if this alarm name is already used somewhere. CloudWatch alarm + names must be unique to an AWS account + + Args: + alarm_name (str): The name of the alarm being created + + Returns: + bool: True if the the alarm name is already present in the config + """ + message = ('CloudWatch metric alarm names must be unique ' + 'within each AWS account. Please remove this alarm ' + 'so it can be updated or choose another name.') + funcs = {metrics.ALERT_PROCESSOR_NAME, metrics.RULE_PROCESSOR_NAME} + for func in funcs: + for cluster in self.config['clusters']: + func_alarms = (self.config['clusters'][cluster]['modules'] + ['stream_alert'][func].get('metric_alarms', {})) + if alarm_name in func_alarms: + LOGGER_CLI.error('An alarm with name \'%s\' already exists in the ' + '\'conf/clusters/%s.json\' cluster. %s', alarm_name, + cluster, message) + return True + + global_config = self.config['global']['infrastructure'].get('monitoring') + if not global_config: + return False + + metric_alarms = global_config.get('metric_alarms') + if not metric_alarms: + return False + + # Check for athena metric alarms also, which are save in the global config + funcs.add(metrics.ATHENA_PARTITION_REFRESH_NAME) + + for func in funcs: + global_func_alarms = global_config['metric_alarms'].get(func, {}) + if alarm_name in global_func_alarms: + LOGGER_CLI.error('An alarm with name \'%s\' already exists in the ' + '\'conf/globals.json\'. %s', alarm_name, message) + return True + + return False + def add_metric_alarm(self, alarm_info): - """Add a metric alarm that corresponds to a predefined metrics""" + """Add a metric alarm that corresponds to a predefined metrics + + Args: + alarm_info (dict): All the necessary values needed to add a CloudWatch + metric alarm + """ + # Check to see if an alarm with this name already exists + if self._alarm_exists(alarm_info['alarm_name']): + return + # Get the current metrics for each function current_metrics = metrics.MetricLogger.get_available_metrics() From ac7f73cab7ecd435d69fcc501e3910e91afe57f5 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 8 Sep 2017 09:21:22 -0700 Subject: [PATCH 12/15] [cli] removing ability to use extended-statistics in alarms for now * Terraform is awful and supporting both statistic and extended-statistic programatically is much too difficult right now. * Updating the property name for `metric` to be `metric_name` to align with what tf expects --- manage.py | 47 ++++++++++++++++++++------------------ stream_alert_cli/config.py | 2 +- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/manage.py b/manage.py index 8f2df74b4..837326d62 100755 --- a/manage.py +++ b/manage.py @@ -25,7 +25,6 @@ """ from argparse import Action, ArgumentParser, RawTextHelpFormatter, SUPPRESS as ARGPARSE_SUPPRESS import os -import re from stream_alert.shared import metrics from stream_alert_cli import __version__ as version @@ -361,12 +360,7 @@ def _add_metric_alarm_subparser(subparsers): ignored if the --metric-target of 'aggregate' is used. Choices are: {} - -es/--extended-statistic The percentile statistic for the metric associated with the alarm. - Specify a value between p0.0 and p100. Cannot be used in - conjunction with the --statistic flag. - -s/--statistic The statistic for the metric associated with the alarm, other than - percentile. For percentile statistics, use --extended-statistic. - Cannot be used in conjunction with the --extended-statistic flag + -s/--statistic The statistic for the metric associated with the alarm. Choices are: SampleCount Average @@ -418,6 +412,7 @@ def _add_metric_alarm_subparser(subparsers): metric_alarm_parser.add_argument( '-m', '--metric', choices=all_metrics, + dest='metric_name', help=ARGPARSE_SUPPRESS, required=True ) @@ -522,22 +517,30 @@ def _alarm_description_validator(val): default=[] ) - # get the extended statistic or statistic value - statistic_group = metric_alarm_parser.add_mutually_exclusive_group() - def _extended_stat_validator(val): - if not re.search(r'p(\d{1,2}(\.\d{0,2})?|100)$', val): - raise metric_alarm_parser.error('extended statistic values must start with \'p\' ' - 'and be followed by a percentage value (ie: p0.0, ' - 'p10, p55.5, p100)') - return val + ### Commenting out the below until we can support 'extended-statistic' metrics + ### alongside 'statistic' metrics. Currently only 'statistic' are supported + # # get the extended statistic or statistic value + # statistic_group = metric_alarm_parser.add_mutually_exclusive_group() + # def _extended_stat_validator(val): + # if not re.search(r'p(\d{1,2}(\.\d{0,2})?|100)$', val): + # raise metric_alarm_parser.error('extended statistic values must start with \'p\' ' + # 'and be followed by a percentage value (ie: p0.0, ' + # 'p10, p55.5, p100)') + # return val + # + # statistic_group.add_argument( + # '-es', '--extended-statistic', + # help=ARGPARSE_SUPPRESS, + # type=_extended_stat_validator + # ) + # + # statistic_group.add_argument( + # '-s', '--statistic', + # choices=['SampleCount', 'Average', 'Sum', 'Minimum', 'Maximum'], + # help=ARGPARSE_SUPPRESS + # ) - statistic_group.add_argument( - '-es', '--extended-statistic', - help=ARGPARSE_SUPPRESS, - type=_extended_stat_validator - ) - - statistic_group.add_argument( + metric_alarm_parser.add_argument( '-s', '--statistic', choices=['SampleCount', 'Average', 'Sum', 'Minimum', 'Maximum'], help=ARGPARSE_SUPPRESS diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index 40972b148..65ae0aac5 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -267,7 +267,7 @@ def add_metric_alarm(self, alarm_info): # Extract the function name this metric is associated with metric_function = {metric: function for function in current_metrics - for metric in current_metrics[function]}[alarm_info['metric']] + for metric in current_metrics[function]}[alarm_info['metric_name']] # Do not continue if the user is trying to apply a metric alarm for an athena # metric to a specific cluster (since the athena function operates on all clusters) From df99b965da0370d8608dcb475986080561f72c09 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 8 Sep 2017 09:25:54 -0700 Subject: [PATCH 13/15] [cli] updating how the cloudwatch alarms are written to the config to make it simpler * The terraform generate code now completely supports writing cloudwatch alarms for both aggregate metric alarms and alarms per cluster/fucntion * Aggregate metric alarms are written to the `main.tf` file to be published. * Per-cluster/function metric alarms are done via the `stream_alert` tf module. --- conf/clusters/prod.json | 2 +- manage.py | 2 +- stream_alert_cli/config.py | 61 ++++++--- stream_alert_cli/terraform_generate.py | 128 ++++++++++++++++-- stream_alert_cli/test.py | 2 +- terraform/modules/tf_stream_alert/main.tf | 18 +++ .../modules/tf_stream_alert/variables.tf | 7 + 7 files changed, 187 insertions(+), 33 deletions(-) diff --git a/conf/clusters/prod.json b/conf/clusters/prod.json index d8bf99796..fc50ba8fa 100644 --- a/conf/clusters/prod.json +++ b/conf/clusters/prod.json @@ -32,7 +32,7 @@ "enable_metrics": true, "log_level": "info", "memory": 128, - "timeout": 30 + "timeout": 10 } } }, diff --git a/manage.py b/manage.py index 837326d62..a8c5bddcd 100755 --- a/manage.py +++ b/manage.py @@ -249,7 +249,7 @@ def _add_metrics_subparser(subparsers): {} Examples: - manage.py metrics --enable + manage.py metrics --enable --functions rule """.format(version, cluster_choices_block)) diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index 65ae0aac5..f18fbda6c 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -153,29 +153,37 @@ def toggle_metrics(self, enabled, clusters, lambda_functions): self.write() @staticmethod - def _add_metric_alarm_config(alarm_info, config, prompt_detail): - """Helper function to add the metric alarm to the respective config""" - metric_alarms = config.get('metric_alarms', {}) - if alarm_info['alarm_name'] in metric_alarms: - prompt = ('Alarm name \'{}\' already defined {}. Would you like ' - 'to overwrite?'.format(alarm_info['alarm_name'], prompt_detail)) - if not continue_prompt(prompt): - return False + def _add_metric_alarm_config(alarm_info, current_alarms): + """Helper function to add the metric alarm to the respective config + + Args: + alarm_info (dict): All the necessary values needed to add a CloudWatch + metric alarm + current_alarms (dict): All of the current metric alarms from the config + Returns: + dict: The new metric alarms dictionary with the added metric alarm + """ # Some keys that come from the argparse options can be omitted omitted_keys = {'debug', 'alarm_name', 'command', 'clusters', 'metric_target'} - metric_alarms[alarm_info['alarm_name']] = { + current_alarms[alarm_info['alarm_name']] = { key: value for key, value in alarm_info.iteritems() - if key not in omitted_keys and value is not None + if key not in omitted_keys } - config['metric_alarms'] = metric_alarms - - return True + return current_alarms def _add_metric_alarm_per_cluster(self, alarm_info, function_name): - """Add a metric alarm for individual clusters""" + """Add a metric alarm for individual clusters. This is for non-aggregate + CloudWatch metric alarms. + + Args: + alarm_info (dict): All the necessary values needed to add a CloudWatch + metric alarm. + function_name (str): The name of the lambda function this metric is + related to. + """ # If no clusters have been specified by the user, we can assume this alarm # should be created for all available clusters, so fall back to that clusters = (alarm_info['clusters'] if alarm_info['clusters'] else @@ -199,10 +207,10 @@ def _add_metric_alarm_per_cluster(self, alarm_info, function_name): 'even though metrics are disabled?'): continue - prompt_context = ('for the \'{}\' function in the \'{}\' ' - 'cluster'.format(function_name, cluster)) - - if self._add_metric_alarm_config(alarm_info, function_config, prompt_context): + metric_alarms = function_config.get('metric_alarms', {}) + new_alarms = self._add_metric_alarm_config(alarm_info, metric_alarms) + if new_alarms != False: + function_config['metric_alarms'] = new_alarms LOGGER_CLI.info('Successfully added \'%s\' metric alarm for the \'%s\' ' 'function to \'conf/clusters/%s.json.\'', alarm_info['alarm_name'], function_name, cluster) @@ -316,10 +324,19 @@ def add_metric_alarm(self, alarm_info): metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME): global_config = self.config['global']['infrastructure']['monitoring'] - prompt_context = 'in the aggregate alarms within \'conf/globals.json\'' - if self._add_metric_alarm_config(alarm_info, global_config, prompt_context): - LOGGER_CLI.info('Successfully added \'%s\' metric alarm to \'conf/global.json.\'', - alarm_info['alarm_name']) + metric_alarms = global_config.get('metric_alarms', {}) + if not metric_alarms: + global_config['metric_alarms'] = {} + + metric_alarms = global_config['metric_alarms'].get(metric_function, {}) + if not metric_alarms: + global_config['metric_alarms'][metric_function] = {} + + new_alarms = self._add_metric_alarm_config(alarm_info, metric_alarms) + if new_alarms != False: + global_config['metric_alarms'][metric_function] = new_alarms + LOGGER_CLI.info('Successfully added \'%s\' metric alarm to ' + '\'conf/global.json.\'', alarm_info['alarm_name']) else: # Add metric alarms on a per-cluster basis - these are added to the cluster config diff --git a/stream_alert_cli/terraform_generate.py b/stream_alert_cli/terraform_generate.py index 9ef8dac9d..be42ffe81 100644 --- a/stream_alert_cli/terraform_generate.py +++ b/stream_alert_cli/terraform_generate.py @@ -16,6 +16,7 @@ from collections import defaultdict import json import os +import string from stream_alert.shared import metrics from stream_alert_cli.logger import LOGGER_CLI @@ -23,6 +24,10 @@ DEFAULT_SNS_MONITORING_TOPIC = 'stream_alert_monitoring' RESTRICTED_CLUSTER_NAMES = ('main', 'athena') +# The FUNC_PREFIXES dict acts as a simple map to a human-readable name +FUNC_PREFIXES = {metrics.ALERT_PROCESSOR_NAME: 'AlertProcessor', + metrics.RULE_PROCESSOR_NAME: 'RuleProcessor'} + class InvalidClusterName(Exception): """Exception for invalid cluster names""" @@ -165,6 +170,36 @@ def generate_main(**kwargs): 'name': DEFAULT_SNS_MONITORING_TOPIC } + # Add any global cloudwatch alarms to the main.tf + monitoring_config = config['global']['infrastructure'].get('monitoring') + if not monitoring_config: + return main_dict + + global_metrics = monitoring_config.get('metric_alarms') + if not global_metrics: + return main_dict + + formatted_alarms = {} + # Add global metric alarms for the rule and alert processors + for func in FUNC_PREFIXES: + if not func in global_metrics: + continue + + for name, settings in global_metrics[func].iteritems(): + alarm_info = settings.copy() + alarm_info['alarm_name'] = name + alarm_info['namespace'] = 'StreamAlert' + alarm_info['alarm_actions'] = ['${{aws_sns_topic.{}.arn}}'.format( + DEFAULT_SNS_MONITORING_TOPIC + )] + # Terraform only allows certain characters in resource names, so strip the name + acceptable_chars = ''.join([string.digits, string.letters, '_-']) + name = filter(acceptable_chars.__contains__, name) + formatted_alarms['metric_alarm_{}'.format(name)] = alarm_info + + if formatted_alarms: + main_dict['resource']['aws_cloudwatch_metric_alarm'] = formatted_alarms + return main_dict @@ -282,7 +317,8 @@ def generate_stream_alert(cluster_name, cluster_dict, config): return True -def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): + +def generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config): """Add the CloudWatch Metric Filters information to the Terraform cluster dict. Args: @@ -295,11 +331,7 @@ def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): current_metrics = metrics.MetricLogger.get_available_metrics() # Add metric filters for the rule and alert processor - # The funcs dict acts as a simple map to a human-readable name - funcs = {metrics.ALERT_PROCESSOR_NAME: 'AlertProcessor', - metrics.RULE_PROCESSOR_NAME: 'RuleProcessor'} - - for func in funcs: + for func in FUNC_PREFIXES: if func not in current_metrics: continue @@ -312,7 +344,7 @@ def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): if not stream_alert_config[func].get('enable_metrics'): continue - metric_prefix = funcs[func] + metric_prefix = FUNC_PREFIXES[func] filter_pattern_idx, filter_value_idx = 0, 1 # Add filters for the cluster and aggregate @@ -335,6 +367,84 @@ def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): ['{}_metric_filters'.format(func)] = filters +def _format_metric_alarm(name, alarm_settings, function, cluster=''): + """Helper function to format a metric alarm as a comma-separated string + + Args: + name (str): + alarm_info (dict): + function (str): + cluster (str): + + Returns: + str: formatted and comma-separated string containing alarm settings + """ + alarm_info = alarm_settings.copy() + # The alarm description and name can potentially have commas so remove them + alarm_info['alarm_description'] = (alarm_info['alarm_description'].replace(',', '') if + alarm_info['alarm_description'] else '') + + # Prepend a prefix for this function to the metric name and append a cluster + # name if there is one available + alarm_info['metric_name'] = '{}-{}'.format(function, alarm_info['metric_name']) + if cluster: + alarm_info['metric_name'] = '{}-{}'.format(alarm_info['metric_name'], cluster.upper()) + + attributes = list(alarm_info) + attributes.sort() + sorted_values = [str(alarm_info[attribute]) if alarm_info[attribute] + else '' for attribute in attributes] + + sorted_values.insert(0, name.replace(',', '')) + + return ','.join(sorted_values) + + +def generate_cloudwatch_metric_alarms(cluster_name, cluster_dict, config): + """Add the CloudWatch Metric Alarms information to the Terraform cluster dict. + + Args: + cluster_name (str): The name of the currently generating cluster + cluster_dict (defaultdict): The dict containing all Terraform config for a given cluster. + config (dict): The loaded config from the 'conf/' directory + """ + infrastructure_config = config['global'].get('infrastructure') + + if not (infrastructure_config and 'monitoring' in infrastructure_config): + LOGGER_CLI.error('Invalid config: Make sure you declare global infrastructure options!') + return + + topic_name = DEFAULT_SNS_MONITORING_TOPIC if infrastructure_config \ + ['monitoring'].get('create_sns_topic') else \ + infrastructure_config['monitoring'].get('sns_topic_name') + + sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( + region=config['global']['account']['region'], + account_id=config['global']['account']['aws_account_id'], + topic=topic_name + ) + + cluster_dict['module']['stream_alert_{}'.format(cluster_name)] \ + ['sns_topic_arn'] = sns_topic_arn + + stream_alert_config = config['clusters'][cluster_name]['modules']['stream_alert'] + + # Add cluster metric alarms for the rule and alert processors + formatted_alarms = [] + for func, func_config in stream_alert_config.iteritems(): + if 'metric_alarms' not in func_config: + continue + + metric_alarms = func_config['metric_alarms'] + for name, alarm_info in metric_alarms.iteritems(): + formatted_alarms.append( + _format_metric_alarm(name, alarm_info, FUNC_PREFIXES[func], cluster_name) + ) + + cluster_dict['module']['stream_alert_{}'.format(cluster_name)] \ + ['metric_alarms'] = formatted_alarms + + def generate_cloudwatch_monitoring(cluster_name, cluster_dict, config): """Add the CloudWatch Monitoring module to the Terraform cluster dict. @@ -613,7 +723,9 @@ def generate_cluster(**kwargs): if not generate_stream_alert(cluster_name, cluster_dict, config): return - generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config) + generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config) + + generate_cloudwatch_metric_alarms(cluster_name, cluster_dict, config) if modules['cloudwatch_monitoring']['enabled']: if not generate_cloudwatch_monitoring(cluster_name, cluster_dict, config): diff --git a/stream_alert_cli/test.py b/stream_alert_cli/test.py index c083ba5f1..b20ff8505 100644 --- a/stream_alert_cli/test.py +++ b/stream_alert_cli/test.py @@ -191,7 +191,7 @@ def _run_rule_tests(self, rule_name, test_record, formatted_record, print_header # Print rule name for section header, but only if we get # to a point where there is a record to actually be tested. - # This avoids potentialy blank sections + # This avoids potentially blank sections if print_header_line and (alerts or self.print_output): print '\n{}'.format(rule_name) diff --git a/terraform/modules/tf_stream_alert/main.tf b/terraform/modules/tf_stream_alert/main.tf index 8d2527de9..553816422 100644 --- a/terraform/modules/tf_stream_alert/main.tf +++ b/terraform/modules/tf_stream_alert/main.tf @@ -180,3 +180,21 @@ resource "aws_cloudwatch_log_metric_filter" "alert_processor_cw_metric_filters" value = "${element(split(",", var.alert_processor_metric_filters[count.index]), 2)}" } } + +// CloudWatch metric alarms that are created per-cluster +// The split list is our way around poor tf support for lists of maps and is made up of: +// , , , , +// , , , +resource "aws_cloudwatch_metric_alarm" "cw_metric_alarms" { + count = "${length(var.metric_alarms)}" + alarm_name = "${element(split(",", var.metric_alarms[count.index]), 0)}" + alarm_description = "${element(split(",", var.metric_alarms[count.index]), 1)}" + comparison_operator = "${element(split(",", var.metric_alarms[count.index]), 2)}" + evaluation_periods = "${element(split(",", var.metric_alarms[count.index]), 3)}" + metric_name = "${element(split(",", var.metric_alarms[count.index]), 4)}" + period = "${element(split(",", var.metric_alarms[count.index]), 5)}" + statistic = "${element(split(",", var.metric_alarms[count.index]), 6)}" + threshold = "${element(split(",", var.metric_alarms[count.index]), 7)}" + namespace = "${var.namespace}" + alarm_actions = ["${var.sns_topic_arn}"] +} diff --git a/terraform/modules/tf_stream_alert/variables.tf b/terraform/modules/tf_stream_alert/variables.tf index 9b06cd835..ec8e832c3 100644 --- a/terraform/modules/tf_stream_alert/variables.tf +++ b/terraform/modules/tf_stream_alert/variables.tf @@ -97,7 +97,14 @@ variable "alert_processor_metric_filters" { default = [] } +variable "metric_alarms" { + type = "list" + default = [] +} + variable "namespace" { type = "string" default = "StreamAlert" } + +variable "sns_topic_arn" {} From 6a45c873265dc2ebb736b1f6d3b8c47c28fee8a9 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 8 Sep 2017 11:11:01 -0700 Subject: [PATCH 14/15] [cli] fixing bug that with sns topic and proper aggregate metric name * Updating unit test * Migarting a constant dict to metrics pacakge to be shared * Various linting --- manage.py | 6 ++- stream_alert/shared/metrics.py | 6 +++ stream_alert_cli/config.py | 21 ++++++-- stream_alert_cli/terraform_generate.py | 53 +++++++++---------- .../test_terraform_generate.py | 5 +- 5 files changed, 55 insertions(+), 36 deletions(-) diff --git a/manage.py b/manage.py index a8c5bddcd..19a9b17a0 100755 --- a/manage.py +++ b/manage.py @@ -504,7 +504,8 @@ def _alarm_description_validator(val): metric_alarm_parser.add_argument( '-ad', '--alarm-description', help=ARGPARSE_SUPPRESS, - type=_alarm_description_validator + type=_alarm_description_validator, + default='' ) # allow the user to select 0 or more clusters to apply this alarm to @@ -543,7 +544,8 @@ def _alarm_description_validator(val): metric_alarm_parser.add_argument( '-s', '--statistic', choices=['SampleCount', 'Average', 'Sum', 'Minimum', 'Maximum'], - help=ARGPARSE_SUPPRESS + help=ARGPARSE_SUPPRESS, + default='' ) # allow verbose output for the CLI with the --debug option diff --git a/stream_alert/shared/metrics.py b/stream_alert/shared/metrics.py index f79c74f03..e0aaa60d6 100644 --- a/stream_alert/shared/metrics.py +++ b/stream_alert/shared/metrics.py @@ -24,6 +24,12 @@ CLUSTER = os.environ.get('CLUSTER', 'unknown_cluster') +# The FUNC_PREFIXES dict acts as a simple map to a human-readable name +# Add ATHENA_PARTITION_REFRESH_NAME: 'AthenaPartitionRefresh', to the +# below when metrics are supported there +FUNC_PREFIXES = {ALERT_PROCESSOR_NAME: 'AlertProcessor', + RULE_PROCESSOR_NAME: 'RuleProcessor'} + try: ENABLE_METRICS = bool(int(os.environ.get('ENABLE_METRICS', 0))) except ValueError as err: diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index f18fbda6c..fd2108a03 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -208,12 +208,20 @@ def _add_metric_alarm_per_cluster(self, alarm_info, function_name): continue metric_alarms = function_config.get('metric_alarms', {}) - new_alarms = self._add_metric_alarm_config(alarm_info, metric_alarms) + + # Format the metric name for the cluster based metric + # Prepend a prefix for this function and append the cluster name + alarm_settings = alarm_info.copy() + alarm_settings['metric_name'] = '{}-{}-{}'.format(metrics.FUNC_PREFIXES[function_name], + alarm_settings['metric_name'], + cluster.upper()) + + new_alarms = self._add_metric_alarm_config(alarm_settings, metric_alarms) if new_alarms != False: function_config['metric_alarms'] = new_alarms LOGGER_CLI.info('Successfully added \'%s\' metric alarm for the \'%s\' ' 'function to \'conf/clusters/%s.json.\'', - alarm_info['alarm_name'], function_name, cluster) + alarm_settings['alarm_name'], function_name, cluster) def _alarm_exists(self, alarm_name): """Check if this alarm name is already used somewhere. CloudWatch alarm @@ -332,11 +340,16 @@ def add_metric_alarm(self, alarm_info): if not metric_alarms: global_config['metric_alarms'][metric_function] = {} - new_alarms = self._add_metric_alarm_config(alarm_info, metric_alarms) + # Format the metric name for the aggregate metric + alarm_settings = alarm_info.copy() + alarm_settings['metric_name'] = '{}-{}'.format(metrics.FUNC_PREFIXES[metric_function], + alarm_info['metric_name']) + + new_alarms = self._add_metric_alarm_config(alarm_settings, metric_alarms) if new_alarms != False: global_config['metric_alarms'][metric_function] = new_alarms LOGGER_CLI.info('Successfully added \'%s\' metric alarm to ' - '\'conf/global.json.\'', alarm_info['alarm_name']) + '\'conf/global.json.\'', alarm_settings['alarm_name']) else: # Add metric alarms on a per-cluster basis - these are added to the cluster config diff --git a/stream_alert_cli/terraform_generate.py b/stream_alert_cli/terraform_generate.py index be42ffe81..e7ea6683f 100644 --- a/stream_alert_cli/terraform_generate.py +++ b/stream_alert_cli/terraform_generate.py @@ -24,10 +24,6 @@ DEFAULT_SNS_MONITORING_TOPIC = 'stream_alert_monitoring' RESTRICTED_CLUSTER_NAMES = ('main', 'athena') -# The FUNC_PREFIXES dict acts as a simple map to a human-readable name -FUNC_PREFIXES = {metrics.ALERT_PROCESSOR_NAME: 'AlertProcessor', - metrics.RULE_PROCESSOR_NAME: 'RuleProcessor'} - class InvalidClusterName(Exception): """Exception for invalid cluster names""" @@ -179,9 +175,19 @@ def generate_main(**kwargs): if not global_metrics: return main_dict + topic_name = (DEFAULT_SNS_MONITORING_TOPIC if infrastructure_config + ['monitoring'].get('create_sns_topic') else + infrastructure_config['monitoring'].get('sns_topic_name')) + + sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( + region=config['global']['account']['region'], + account_id=config['global']['account']['aws_account_id'], + topic=topic_name + ) + formatted_alarms = {} # Add global metric alarms for the rule and alert processors - for func in FUNC_PREFIXES: + for func in metrics.FUNC_PREFIXES: if not func in global_metrics: continue @@ -189,9 +195,7 @@ def generate_main(**kwargs): alarm_info = settings.copy() alarm_info['alarm_name'] = name alarm_info['namespace'] = 'StreamAlert' - alarm_info['alarm_actions'] = ['${{aws_sns_topic.{}.arn}}'.format( - DEFAULT_SNS_MONITORING_TOPIC - )] + alarm_info['alarm_actions'] = [sns_topic_arn] # Terraform only allows certain characters in resource names, so strip the name acceptable_chars = ''.join([string.digits, string.letters, '_-']) name = filter(acceptable_chars.__contains__, name) @@ -331,7 +335,7 @@ def generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config): current_metrics = metrics.MetricLogger.get_available_metrics() # Add metric filters for the rule and alert processor - for func in FUNC_PREFIXES: + for func, metric_prefix in metrics.FUNC_PREFIXES.iteritems(): if func not in current_metrics: continue @@ -344,7 +348,6 @@ def generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config): if not stream_alert_config[func].get('enable_metrics'): continue - metric_prefix = FUNC_PREFIXES[func] filter_pattern_idx, filter_value_idx = 0, 1 # Add filters for the cluster and aggregate @@ -367,28 +370,22 @@ def generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config): ['{}_metric_filters'.format(func)] = filters -def _format_metric_alarm(name, alarm_settings, function, cluster=''): +def _format_metric_alarm(name, alarm_settings): """Helper function to format a metric alarm as a comma-separated string Args: - name (str): - alarm_info (dict): - function (str): - cluster (str): + name (str): The name of the alarm to create + alarm_info (dict): All other settings for this alarm (threshold, etc) + function (str): The respective function this alarm is being created for. + This is the RuleProcessor or AlertProcessor + cluster (str): The cluster that this metric is related to Returns: str: formatted and comma-separated string containing alarm settings """ alarm_info = alarm_settings.copy() # The alarm description and name can potentially have commas so remove them - alarm_info['alarm_description'] = (alarm_info['alarm_description'].replace(',', '') if - alarm_info['alarm_description'] else '') - - # Prepend a prefix for this function to the metric name and append a cluster - # name if there is one available - alarm_info['metric_name'] = '{}-{}'.format(function, alarm_info['metric_name']) - if cluster: - alarm_info['metric_name'] = '{}-{}'.format(alarm_info['metric_name'], cluster.upper()) + alarm_info['alarm_description'] = alarm_info['alarm_description'].replace(',', '') attributes = list(alarm_info) attributes.sort() @@ -414,9 +411,9 @@ def generate_cloudwatch_metric_alarms(cluster_name, cluster_dict, config): LOGGER_CLI.error('Invalid config: Make sure you declare global infrastructure options!') return - topic_name = DEFAULT_SNS_MONITORING_TOPIC if infrastructure_config \ - ['monitoring'].get('create_sns_topic') else \ - infrastructure_config['monitoring'].get('sns_topic_name') + topic_name = (DEFAULT_SNS_MONITORING_TOPIC if infrastructure_config + ['monitoring'].get('create_sns_topic') else + infrastructure_config['monitoring'].get('sns_topic_name')) sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( region=config['global']['account']['region'], @@ -431,14 +428,14 @@ def generate_cloudwatch_metric_alarms(cluster_name, cluster_dict, config): # Add cluster metric alarms for the rule and alert processors formatted_alarms = [] - for func, func_config in stream_alert_config.iteritems(): + for func_config in stream_alert_config.values(): if 'metric_alarms' not in func_config: continue metric_alarms = func_config['metric_alarms'] for name, alarm_info in metric_alarms.iteritems(): formatted_alarms.append( - _format_metric_alarm(name, alarm_info, FUNC_PREFIXES[func], cluster_name) + _format_metric_alarm(name, alarm_info) ) cluster_dict['module']['stream_alert_{}'.format(cluster_name)] \ diff --git a/tests/unit/stream_alert_cli/test_terraform_generate.py b/tests/unit/stream_alert_cli/test_terraform_generate.py index 58dd8b832..a3966929b 100644 --- a/tests/unit/stream_alert_cli/test_terraform_generate.py +++ b/tests/unit/stream_alert_cli/test_terraform_generate.py @@ -585,14 +585,15 @@ def test_generate_athena(self): 'prefix': 'unit-testing' }, 'infrastructure': { - 'metrics': { - 'enabled': True + 'monitoring': { + 'create_sns_topic': True } } }, 'lambda': { 'athena_partition_refresh_config': { 'enabled': True, + 'enable_metrics': True, 'current_version': '$LATEST', 'refresh_type': { 'repair_hive_table': { From e427d16695b92f2a9754031e1637175720f1ab57 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 8 Sep 2017 16:43:44 -0700 Subject: [PATCH 15/15] [pr] addressing feedback from @austinbyers --- manage.py | 2 +- stream_alert_cli/terraform_generate.py | 2 ++ terraform/modules/tf_stream_alert/main.tf | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/manage.py b/manage.py index 19a9b17a0..44a04e43c 100755 --- a/manage.py +++ b/manage.py @@ -404,7 +404,7 @@ def _add_metric_alarm_subparser(subparsers): help=ARGPARSE_SUPPRESS ) - # Set the name of this parser to 'validate-schemas' + # Set the name of this parser to 'create-alarm' metric_alarm_parser.set_defaults(command='create-alarm') # add all the required parameters diff --git a/stream_alert_cli/terraform_generate.py b/stream_alert_cli/terraform_generate.py index e7ea6683f..0b00c6dad 100644 --- a/stream_alert_cli/terraform_generate.py +++ b/stream_alert_cli/terraform_generate.py @@ -432,6 +432,8 @@ def generate_cloudwatch_metric_alarms(cluster_name, cluster_dict, config): if 'metric_alarms' not in func_config: continue + # TODO: update this logic to simply use a list of maps once Terraform fixes + # their support for this, instead of the comma-separated string this creates metric_alarms = func_config['metric_alarms'] for name, alarm_info in metric_alarms.iteritems(): formatted_alarms.append( diff --git a/terraform/modules/tf_stream_alert/main.tf b/terraform/modules/tf_stream_alert/main.tf index 553816422..cd41bbb69 100644 --- a/terraform/modules/tf_stream_alert/main.tf +++ b/terraform/modules/tf_stream_alert/main.tf @@ -185,6 +185,7 @@ resource "aws_cloudwatch_log_metric_filter" "alert_processor_cw_metric_filters" // The split list is our way around poor tf support for lists of maps and is made up of: // , , , , // , , , +// TODO: update this logic to simply use a variable that is a list of maps once Terraform fixes this resource "aws_cloudwatch_metric_alarm" "cw_metric_alarms" { count = "${length(var.metric_alarms)}" alarm_name = "${element(split(",", var.metric_alarms[count.index]), 0)}"