diff --git a/conf/clusters/prod.json b/conf/clusters/prod.json index 2b66c9469..fc50ba8fa 100644 --- a/conf/clusters/prod.json +++ b/conf/clusters/prod.json @@ -29,6 +29,7 @@ }, "rule_processor": { "current_version": "$LATEST", + "enable_metrics": true, "log_level": "info", "memory": 128, "timeout": 10 diff --git a/conf/global.json b/conf/global.json index b7b743d4a..79952fda2 100644 --- a/conf/global.json +++ b/conf/global.json @@ -6,9 +6,6 @@ "region": "us-east-1" }, "infrastructure": { - "metrics": { - "enabled": true - }, "monitoring": { "create_sns_topic": true } diff --git a/docs/source/athena-deploy.rst b/docs/source/athena-deploy.rst index 999b766fc..0213ad529 100644 --- a/docs/source/athena-deploy.rst +++ b/docs/source/athena-deploy.rst @@ -55,6 +55,7 @@ Open ``conf/lambda.json``, and fill in the following ``Required`` options: Key Required Default Description ----------------------------------- -------- -------------------- ----------- ``enabled`` ``Yes`` ``true`` Enables/Disables the Athena Partition Refresh Lambda function +``enable_metrics`` ``No`` ``false`` Enables/Disables logging of metrics for the Athena Partition Refresh Lambda function ``log_level`` ``No`` ``info`` The log level for the Lambda function, can be either ``info`` or ``debug``. Debug will help with diagnosing errors with polling SQS or sending Athena queries. ``memory`` ``No`` ``128`` The amount of memory (in MB) allocated to the Lambda function ``timeout`` ``No`` ``60`` The maximum duration of the Lambda function (in seconds) @@ -70,6 +71,7 @@ Key Required Default Descriptio { "athena_partition_refresh_config": { "enabled": true, + "enable_metrics": false, "log_level": "info", "memory": 128, "refresh_type": { diff --git a/manage.py b/manage.py index d911443ee..44a04e43c 100755 --- a/manage.py +++ b/manage.py @@ -23,14 +23,41 @@ terraform -var-file=../terraform.tfvars -var-file=../variables.json """ -from argparse import ArgumentParser, RawTextHelpFormatter, SUPPRESS as ARGPARSE_SUPPRESS +from argparse import Action, ArgumentParser, RawTextHelpFormatter, SUPPRESS as ARGPARSE_SUPPRESS import os +from stream_alert.shared import metrics from stream_alert_cli import __version__ as version from stream_alert_cli.logger import LOGGER_CLI from stream_alert_cli.runner import cli_runner +class UniqueSetAction(Action): + """Subclass of argparse.Action to avoid multiple of the same choice from a list""" + def __call__(self, parser, namespace, values, option_string=None): + unique_items = set(values) + setattr(namespace, self.dest, unique_items) + + +class NormalizeFunctionAction(UniqueSetAction): + """Subclass of argparse.Action -> UniqueSetAction that will return a unique set of + normalized lambda function names. + """ + def __call__(self, parser, namespace, values, option_string=None): + super(NormalizeFunctionAction, self).__call__(parser, namespace, values, option_string) + values = getattr(namespace, self.dest) + normalized_map = {'rule': metrics.RULE_PROCESSOR_NAME, + 'alert': metrics.ALERT_PROCESSOR_NAME, + 'athena': metrics.ATHENA_PARTITION_REFRESH_NAME} + + for func, normalize_func in normalized_map.iteritems(): + if func in values: + values.remove(func) + values.add(normalize_func) + + setattr(namespace, self.dest, values) + + def _add_output_subparser(subparsers): """Add the output subparser: manage.py output [subcommand] [options]""" output_usage = 'manage.py output [subcommand] [options]' @@ -113,9 +140,8 @@ def _add_live_test_subparser(subparsers): live_test_parser.set_defaults(command='live-test') # get cluster choices from available files - clusters = [] - for _, _, files in os.walk('conf/clusters'): - clusters.extend(os.path.splitext(cluster)[0] for cluster in files) + clusters = [os.path.splitext(cluster)[0] for _, _, files + in os.walk('conf/clusters') for cluster in files] # add clusters for user to pick from live_test_parser.add_argument( @@ -145,7 +171,8 @@ def _add_validate_schema_subparser(subparsers): schema_validation_usage = 'manage.py validate-schemas [options]' schema_validation_description = (""" StreamAlertCLI v{} -Run end-to-end tests that will attempt to send alerts +Run validation of schemas in logs.json using configured integration test files. Validation +does not actually run the rules engine on test events. Available Options: @@ -190,6 +217,346 @@ def _add_validate_schema_subparser(subparsers): ) +def _add_metrics_subparser(subparsers): + """Add the metrics subparser: manage.py metrics [options]""" + metrics_usage = 'manage.py metrics [options]' + + # get cluster choices from available files + clusters = [os.path.splitext(cluster)[0] for _, _, files + in os.walk('conf/clusters') for cluster in files] + + cluster_choices_block = ('\n').join('{:>28}{}'.format('', cluster) for cluster in clusters) + + metrics_description = (""" +StreamAlertCLI v{} +Enable or disable metrics for all lambda functions. This toggles the creation of metric filters. + +Available Options: + + -e/--enable Enable CloudWatch metrics through logging and metric filters + -d/--disable Disable CloudWatch metrics through logging and metric filters + -f/--functions Space delimited list of functions to enable metrics for + Choices are: + rule + alert (not implemented) + athena (not implemented) + --debug Enable Debug logger output + +Optional Arguemnts: + + -c/--clusters Space delimited list of clusters to enable metrics for. If + omitted, this will enable metrics for all clusters. Choices are: +{} +Examples: + + manage.py metrics --enable --functions rule + +""".format(version, cluster_choices_block)) + + metrics_parser = subparsers.add_parser( + 'metrics', + description=metrics_description, + usage=metrics_usage, + formatter_class=RawTextHelpFormatter, + help=ARGPARSE_SUPPRESS + ) + + # Set the name of this parser to 'metrics' + metrics_parser.set_defaults(command='metrics') + + # allow the user to select 1 or more functions to enable metrics for + metrics_parser.add_argument( + '-f', '--functions', + choices=['rule', 'alert', 'athena'], + help=ARGPARSE_SUPPRESS, + nargs='+', + action=NormalizeFunctionAction, + required=True + ) + + # get the metric toggle value + toggle_group = metrics_parser.add_mutually_exclusive_group(required=True) + + toggle_group.add_argument( + '-e', '--enable', + dest='enable_metrics', + action='store_true' + ) + + toggle_group.add_argument( + '-d', '--disable', + dest='enable_metrics', + action='store_false' + ) + + # allow the user to select 0 or more clusters to enable metrics for + metrics_parser.add_argument( + '-c', '--clusters', + choices=clusters, + help=ARGPARSE_SUPPRESS, + nargs='+', + action=UniqueSetAction, + default=clusters + ) + + # allow verbose output for the CLI with the --debug option + metrics_parser.add_argument( + '--debug', + action='store_true', + help=ARGPARSE_SUPPRESS + ) + + +def _add_metric_alarm_subparser(subparsers): + """Add the create-alarm subparser: manage.py create-alarm [options]""" + metric_alarm_usage = 'manage.py create-alarm [options]' + + # get the available metrics to be used + available_metrics = metrics.MetricLogger.get_available_metrics() + all_metrics = [metric for func in available_metrics for metric in available_metrics[func]] + + metric_choices_block = ('\n').join('{:>35}{}'.format('', metric) for metric in all_metrics) + + # get cluster choices from available files + clusters = [os.path.splitext(cluster)[0] for _, _, files + in os.walk('conf/clusters') for cluster in files] + + cluster_choices_block = ('\n').join('{:>37}{}'.format('', cluster) for cluster in clusters) + + metric_alarm_description = (""" +StreamAlertCLI v{} +Add a CloudWatch alarm for predefined metrics. These are save in the config and +Terraform is used to create the alarms. + +Required Arguments: + + -m/--metric The predefined metric to assign this alarm to. Choices are: +{} + -mt/--metric-target The target of this metric alarm, meaning either the cluster metric + or the aggrea metric. Choices are: + cluster + aggregate + all + -co/--comparison-operator Comparison operator to use for this metric. Choices are: + GreaterThanOrEqualToThreshold + GreaterThanThreshold + LessThanThreshold + LessThanOrEqualToThreshold + -an/--alarm-name The name for the alarm. This name must be unique within the AWS + account + -ep/--evaluation-periods The number of periods over which data is compared to the specified + threshold. The minimum value for this is 1. Also see the 'Other + Constraints' section below + -p/--period The period, in seconds, over which the specified statistic is + applied. Valid values are any multiple of 60. Also see the + 'Other Constraints' section below + -t/--threshold The value against which the specified statistic is compared. This + value should be a double. + +Optional Arguments: + + -ad/--alarm-description The description for the alarm + -c/--clusters Space delimited list of clusters to apply this metric to. This is + ignored if the --metric-target of 'aggregate' is used. + Choices are: +{} + -s/--statistic The statistic for the metric associated with the alarm. + Choices are: + SampleCount + Average + Sum + Minimum + Maximum + --debug Enable Debug logger output + +Other Constraints: + + The product of the value for period multiplied by the value for evaluation periods cannot + exceed 86,400. 86,400 is the number of seconds in one day and an alarm's total current + evaluation period can be no longer than one day. + +Examples: + + manage.py create-alarm \\ + --metric FailedParses \\ + --metric-target cluster \\ + --comparison-operator GreaterThanOrEqualToThreshold \\ + --alarm-name FailedParsesAlarm \\ + --evaluation-periods 1 \\ + --period 300 \\ + --threshold 1.0 \\ + --alarm-description 'Alarm for any failed parses that occur within a 5 minute period in the prod cluster' \\ + --clusters prod \\ + --statistic Sum + +Resources: + + AWS: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricAlarm.html + Terraform: https://www.terraform.io/docs/providers/aws/r/cloudwatch_metric_alarm.html + +""".format(version, metric_choices_block, cluster_choices_block)) + + metric_alarm_parser = subparsers.add_parser( + 'create-alarm', + description=metric_alarm_description, + usage=metric_alarm_usage, + formatter_class=RawTextHelpFormatter, + help=ARGPARSE_SUPPRESS + ) + + # Set the name of this parser to 'create-alarm' + metric_alarm_parser.set_defaults(command='create-alarm') + + # add all the required parameters + # add metrics for user to pick from. Will be mapped to 'metric_name' in terraform + metric_alarm_parser.add_argument( + '-m', '--metric', + choices=all_metrics, + dest='metric_name', + help=ARGPARSE_SUPPRESS, + required=True + ) + + # check to see what the user wants to apply this metric to (cluster, aggregate, or both) + metric_alarm_parser.add_argument( + '-mt', '--metric-target', + choices=['cluster', 'aggregate', 'all'], + help=ARGPARSE_SUPPRESS, + required=True + ) + + # get the comparison type for this metric + metric_alarm_parser.add_argument( + '-co', '--comparison-operator', + choices=['GreaterThanOrEqualToThreshold', 'GreaterThanThreshold', + 'LessThanThreshold', 'LessThanOrEqualToThreshold'], + help=ARGPARSE_SUPPRESS, + required=True + ) + + # get the name of the alarm + def _alarm_name_validator(val): + if not 1 <= len(val) <= 255: + raise metric_alarm_parser.error('alarm name length must be between 1 and 255') + return val + + metric_alarm_parser.add_argument( + '-an', '--alarm-name', + help=ARGPARSE_SUPPRESS, + required=True, + type=_alarm_name_validator + ) + + # get the evaluation period for this alarm + def _alarm_eval_periods_validator(val): + error = 'evaluation periods must be an integer greater than 0' + try: + period = int(val) + except ValueError: + raise metric_alarm_parser.error(error) + + if period <= 0: + raise metric_alarm_parser.error(error) + return period + + metric_alarm_parser.add_argument( + '-ep', '--evaluation-periods', + help=ARGPARSE_SUPPRESS, + required=True, + type=_alarm_eval_periods_validator + ) + + # get the period for this alarm + def _alarm_period_validator(val): + error = 'period must be an integer in multiples of 60' + try: + period = int(val) + except ValueError: + raise metric_alarm_parser.error(error) + + if period <= 0 or period % 60 != 0: + raise metric_alarm_parser.error(error) + + return period + + metric_alarm_parser.add_argument( + '-p', '--period', + help=ARGPARSE_SUPPRESS, + required=True, + type=_alarm_period_validator + ) + + # get the threshold for this alarm + metric_alarm_parser.add_argument( + '-t', '--threshold', + help=ARGPARSE_SUPPRESS, + required=True, + type=float + ) + + # all other optional flags + # get the optional alarm description + def _alarm_description_validator(val): + if len(val) > 1024: + raise metric_alarm_parser.error('alarm description length must be less than 1024') + return val + + metric_alarm_parser.add_argument( + '-ad', '--alarm-description', + help=ARGPARSE_SUPPRESS, + type=_alarm_description_validator, + default='' + ) + + # allow the user to select 0 or more clusters to apply this alarm to + metric_alarm_parser.add_argument( + '-c', '--clusters', + choices=clusters, + help=ARGPARSE_SUPPRESS, + nargs='+', + action=UniqueSetAction, + default=[] + ) + + ### Commenting out the below until we can support 'extended-statistic' metrics + ### alongside 'statistic' metrics. Currently only 'statistic' are supported + # # get the extended statistic or statistic value + # statistic_group = metric_alarm_parser.add_mutually_exclusive_group() + # def _extended_stat_validator(val): + # if not re.search(r'p(\d{1,2}(\.\d{0,2})?|100)$', val): + # raise metric_alarm_parser.error('extended statistic values must start with \'p\' ' + # 'and be followed by a percentage value (ie: p0.0, ' + # 'p10, p55.5, p100)') + # return val + # + # statistic_group.add_argument( + # '-es', '--extended-statistic', + # help=ARGPARSE_SUPPRESS, + # type=_extended_stat_validator + # ) + # + # statistic_group.add_argument( + # '-s', '--statistic', + # choices=['SampleCount', 'Average', 'Sum', 'Minimum', 'Maximum'], + # help=ARGPARSE_SUPPRESS + # ) + + metric_alarm_parser.add_argument( + '-s', '--statistic', + choices=['SampleCount', 'Average', 'Sum', 'Minimum', 'Maximum'], + help=ARGPARSE_SUPPRESS, + default='' + ) + + # allow verbose output for the CLI with the --debug option + metric_alarm_parser.add_argument( + '--debug', + action='store_true', + help=ARGPARSE_SUPPRESS + ) + + + def _add_lambda_subparser(subparsers): """Add the Lambda subparser: manage.py lambda [subcommand] [options]""" lambda_usage = 'manage.py lambda [subcommand] [options]' @@ -478,6 +845,8 @@ def build_parser(): _add_output_subparser(subparsers) _add_live_test_subparser(subparsers) _add_validate_schema_subparser(subparsers) + _add_metrics_subparser(subparsers) + _add_metric_alarm_subparser(subparsers) _add_lambda_subparser(subparsers) _add_terraform_subparser(subparsers) _add_configure_subparser(subparsers) diff --git a/stream_alert/rule_processor/handler.py b/stream_alert/rule_processor/handler.py index f80f6e3a6..166d94153 100644 --- a/stream_alert/rule_processor/handler.py +++ b/stream_alert/rule_processor/handler.py @@ -53,6 +53,7 @@ def __init__(self, context, enable_alert_processor=True): self.enable_alert_processor = enable_alert_processor self._failed_record_count = 0 + self._processed_size = 0 self._alerts = [] def run(self, event): @@ -103,6 +104,10 @@ def run(self, event): self._process_alerts(payload) + MetricLogger.log_metric(FUNCTION_NAME, + MetricLogger.TOTAL_PROCESSED_SIZE, + self._processed_size) + LOGGER.debug('Invalid record count: %d', self._failed_record_count) MetricLogger.log_metric(FUNCTION_NAME, @@ -135,6 +140,8 @@ def _process_alerts(self, payload): payload (StreamPayload): StreamAlert payload object being processed """ for record in payload.pre_parse(): + # Increment the processed size using the length of this record + self._processed_size += len(record.pre_parsed_record) self.classifier.classify_record(record) if not record.valid: if self.env['lambda_alias'] != 'development': diff --git a/stream_alert/shared/metrics.py b/stream_alert/shared/metrics.py index b8a323dce..e0aaa60d6 100644 --- a/stream_alert/shared/metrics.py +++ b/stream_alert/shared/metrics.py @@ -24,6 +24,12 @@ CLUSTER = os.environ.get('CLUSTER', 'unknown_cluster') +# The FUNC_PREFIXES dict acts as a simple map to a human-readable name +# Add ATHENA_PARTITION_REFRESH_NAME: 'AthenaPartitionRefresh', to the +# below when metrics are supported there +FUNC_PREFIXES = {ALERT_PROCESSOR_NAME: 'AlertProcessor', + RULE_PROCESSOR_NAME: 'RuleProcessor'} + try: ENABLE_METRICS = bool(int(os.environ.get('ENABLE_METRICS', 0))) except ValueError as err: @@ -45,6 +51,7 @@ class MetricLogger(object): # Constant metric names used for CloudWatch FAILED_PARSES = 'FailedParses' S3_DOWNLOAD_TIME = 'S3DownloadTime' + TOTAL_PROCESSED_SIZE = 'TotalProcessedSize' TOTAL_RECORDS = 'TotalRecords' TOTAL_S3_RECORDS = 'TotalS3Records' TRIGGERED_ALERTS = 'TriggeredAlerts' @@ -64,6 +71,7 @@ class MetricLogger(object): RULE_PROCESSOR_NAME: { FAILED_PARSES: (_default_filter.format(FAILED_PARSES), _default_value_lookup), S3_DOWNLOAD_TIME: (_default_filter.format(S3_DOWNLOAD_TIME), _default_value_lookup), + TOTAL_PROCESSED_SIZE: (_default_filter.format(TOTAL_RECORDS), _default_value_lookup), TOTAL_RECORDS: (_default_filter.format(TOTAL_RECORDS), _default_value_lookup), TOTAL_S3_RECORDS: (_default_filter.format(TOTAL_S3_RECORDS), _default_value_lookup), TRIGGERED_ALERTS: (_default_filter.format(TRIGGERED_ALERTS), _default_value_lookup) diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index 631dc9e7e..fd2108a03 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -19,6 +19,8 @@ import re import sys +from stream_alert.shared import metrics +from stream_alert_cli.helpers import continue_prompt from stream_alert_cli.logger import LOGGER_CLI @@ -126,6 +128,236 @@ def set_aws_account_id(self, aws_account_id): LOGGER_CLI.info('AWS Account ID successfully configured') + def toggle_metrics(self, enabled, clusters, lambda_functions): + """Toggle CloudWatch metric logging and filter creation + + Args: + enabled (bool): False if disabling metrics, true if enable_logging + clusters (list): Clusters to enable or disable metrics on + lambda_functions (list): Which lambda functions to enable or disable + metrics on (rule, alert, or athena) + """ + for function in lambda_functions: + if function == metrics.ATHENA_PARTITION_REFRESH_NAME: + if 'athena_partition_refresh_config' in self.config['lambda']: + self.config['lambda']['athena_partition_refresh_config'] \ + ['enable_metrics'] = enabled + else: + LOGGER_CLI.error('No Athena configuration found; please initialize first.') + continue + + for cluster in clusters: + self.config['clusters'][cluster]['modules']['stream_alert'] \ + [function]['enable_metrics'] = enabled + + self.write() + + @staticmethod + def _add_metric_alarm_config(alarm_info, current_alarms): + """Helper function to add the metric alarm to the respective config + + Args: + alarm_info (dict): All the necessary values needed to add a CloudWatch + metric alarm + current_alarms (dict): All of the current metric alarms from the config + + Returns: + dict: The new metric alarms dictionary with the added metric alarm + """ + # Some keys that come from the argparse options can be omitted + omitted_keys = {'debug', 'alarm_name', 'command', 'clusters', 'metric_target'} + + current_alarms[alarm_info['alarm_name']] = { + key: value for key, value in alarm_info.iteritems() + if key not in omitted_keys + } + + return current_alarms + + def _add_metric_alarm_per_cluster(self, alarm_info, function_name): + """Add a metric alarm for individual clusters. This is for non-aggregate + CloudWatch metric alarms. + + Args: + alarm_info (dict): All the necessary values needed to add a CloudWatch + metric alarm. + function_name (str): The name of the lambda function this metric is + related to. + """ + # If no clusters have been specified by the user, we can assume this alarm + # should be created for all available clusters, so fall back to that + clusters = (alarm_info['clusters'] if alarm_info['clusters'] else + list(self.config['clusters'])) + + # Go over each of the clusters and see if enable_metrics == True and prompt + # the user to toggle metrics on if this is False + for cluster in clusters: + function_config = (self.config['clusters'][cluster]['modules'] + ['stream_alert'][function_name]) + + if not function_config.get('enable_metrics'): + prompt = ('Metrics are not currently enabled for the \'{}\' function ' + 'within the \'{}\' cluster. Would you like to enable metrics ' + 'for this cluster?'.format(function_name, cluster)) + + if continue_prompt(prompt): + self.toggle_metrics(True, [cluster], [function_name]) + + elif not continue_prompt('Would you still like to add this alarm ' + 'even though metrics are disabled?'): + continue + + metric_alarms = function_config.get('metric_alarms', {}) + + # Format the metric name for the cluster based metric + # Prepend a prefix for this function and append the cluster name + alarm_settings = alarm_info.copy() + alarm_settings['metric_name'] = '{}-{}-{}'.format(metrics.FUNC_PREFIXES[function_name], + alarm_settings['metric_name'], + cluster.upper()) + + new_alarms = self._add_metric_alarm_config(alarm_settings, metric_alarms) + if new_alarms != False: + function_config['metric_alarms'] = new_alarms + LOGGER_CLI.info('Successfully added \'%s\' metric alarm for the \'%s\' ' + 'function to \'conf/clusters/%s.json.\'', + alarm_settings['alarm_name'], function_name, cluster) + + def _alarm_exists(self, alarm_name): + """Check if this alarm name is already used somewhere. CloudWatch alarm + names must be unique to an AWS account + + Args: + alarm_name (str): The name of the alarm being created + + Returns: + bool: True if the the alarm name is already present in the config + """ + message = ('CloudWatch metric alarm names must be unique ' + 'within each AWS account. Please remove this alarm ' + 'so it can be updated or choose another name.') + funcs = {metrics.ALERT_PROCESSOR_NAME, metrics.RULE_PROCESSOR_NAME} + for func in funcs: + for cluster in self.config['clusters']: + func_alarms = (self.config['clusters'][cluster]['modules'] + ['stream_alert'][func].get('metric_alarms', {})) + if alarm_name in func_alarms: + LOGGER_CLI.error('An alarm with name \'%s\' already exists in the ' + '\'conf/clusters/%s.json\' cluster. %s', alarm_name, + cluster, message) + return True + + global_config = self.config['global']['infrastructure'].get('monitoring') + if not global_config: + return False + + metric_alarms = global_config.get('metric_alarms') + if not metric_alarms: + return False + + # Check for athena metric alarms also, which are save in the global config + funcs.add(metrics.ATHENA_PARTITION_REFRESH_NAME) + + for func in funcs: + global_func_alarms = global_config['metric_alarms'].get(func, {}) + if alarm_name in global_func_alarms: + LOGGER_CLI.error('An alarm with name \'%s\' already exists in the ' + '\'conf/globals.json\'. %s', alarm_name, message) + return True + + return False + + def add_metric_alarm(self, alarm_info): + """Add a metric alarm that corresponds to a predefined metrics + + Args: + alarm_info (dict): All the necessary values needed to add a CloudWatch + metric alarm + """ + # Check to see if an alarm with this name already exists + if self._alarm_exists(alarm_info['alarm_name']): + return + + # Get the current metrics for each function + current_metrics = metrics.MetricLogger.get_available_metrics() + + # Extract the function name this metric is associated with + metric_function = {metric: function for function in current_metrics + for metric in current_metrics[function]}[alarm_info['metric_name']] + + # Do not continue if the user is trying to apply a metric alarm for an athena + # metric to a specific cluster (since the athena function operates on all clusters) + if (alarm_info['metric_target'] != 'aggregate' and + metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME): + LOGGER_CLI.error('Metrics for the athena function can only be applied ' + 'to an aggregate metric target, not on a per-cluster basis.') + return + + # If the metric is related to either the rule processor or alert processor, we should + # check to see if any cluster has metrics enabled for that function before continuing + if (metric_function in {metrics.ALERT_PROCESSOR_NAME, metrics.RULE_PROCESSOR_NAME} and + not any(self.config['clusters'][cluster]['modules']['stream_alert'] + [metric_function].get('enable_metrics') for cluster in + self.config['clusters'])): + prompt = ('Metrics are not currently enabled for the \'{}\' function ' + 'within any cluster. Creating an alarm will have no effect ' + 'until metrics are enabled for this function in at least one ' + 'cluster. Would you still like to continue?'.format(metric_function)) + if not continue_prompt(prompt): + return + + elif metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME: + # If the user is attempting to add a metric for athena, make sure the athena + # function is initialized first + if 'athena_partition_refresh_config' not in self.config['lambda']: + LOGGER_CLI.error('No configuration found for Athena Partition Refresh. ' + 'Please run: `$ python manage.py athena init` first.') + return + + # If the athena function is initialized, but metrics are not enabled, ask + # the user if they would like to enable them now + if not self.config['lambda']['athena_partition_refresh_config'].get('enable_metrics'): + prompt = ('Metrics are not currently enabled for the \'athena\' function. ' + 'Would you like to enable metrics for athena?') + + if continue_prompt(prompt): + self.toggle_metrics(True, None, [metric_function]) + + elif not continue_prompt('Would you still like to add this alarm ' + 'even though metrics are disabled?'): + return + + # Add metric alarms for the aggregate metrics - these are added to the global config + if (alarm_info['metric_target'] == 'aggregate' or + metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME): + global_config = self.config['global']['infrastructure']['monitoring'] + + metric_alarms = global_config.get('metric_alarms', {}) + if not metric_alarms: + global_config['metric_alarms'] = {} + + metric_alarms = global_config['metric_alarms'].get(metric_function, {}) + if not metric_alarms: + global_config['metric_alarms'][metric_function] = {} + + # Format the metric name for the aggregate metric + alarm_settings = alarm_info.copy() + alarm_settings['metric_name'] = '{}-{}'.format(metrics.FUNC_PREFIXES[metric_function], + alarm_info['metric_name']) + + new_alarms = self._add_metric_alarm_config(alarm_settings, metric_alarms) + if new_alarms != False: + global_config['metric_alarms'][metric_function] = new_alarms + LOGGER_CLI.info('Successfully added \'%s\' metric alarm to ' + '\'conf/global.json.\'', alarm_settings['alarm_name']) + + else: + # Add metric alarms on a per-cluster basis - these are added to the cluster config + self._add_metric_alarm_per_cluster(alarm_info, metric_function) + + # Save all of the alarm updates to disk + self.write() + def load(self): """Load the cluster, global, and lambda configuration files diff --git a/stream_alert_cli/helpers.py b/stream_alert_cli/helpers.py index 6229dff51..b4fa817c0 100644 --- a/stream_alert_cli/helpers.py +++ b/stream_alert_cli/helpers.py @@ -67,6 +67,17 @@ def run_command(runner_args, **kwargs): return True +def continue_prompt(prompt=''): + """Continue prompt used to check user's response""" + required_responses = {'yes', 'no'} + response = '' + while response not in required_responses: + prompt = prompt or 'Would you like to continue?' + response = raw_input('\n{} (yes or no): '.format(prompt)) + + return response == 'yes' + + def format_lambda_test_record(test_record): """Create a properly formatted Kinesis, S3, or SNS record. diff --git a/stream_alert_cli/runner.py b/stream_alert_cli/runner.py index 75c6b2d8f..235f264db 100644 --- a/stream_alert_cli/runner.py +++ b/stream_alert_cli/runner.py @@ -25,6 +25,7 @@ from stream_alert_cli import helpers from stream_alert_cli.config import CLIConfig +from stream_alert_cli.helpers import continue_prompt from stream_alert_cli.logger import LOGGER_CLI import stream_alert_cli.outputs as config_outputs from stream_alert_cli.package import AlertProcessorPackage, AthenaPackage, RuleProcessorPackage @@ -73,6 +74,12 @@ def cli_runner(options): elif options.command == 'athena': athena_handler(options) + elif options.command == 'metrics': + _toggle_metrics(options) + + elif options.command == 'create-alarm': + _create_alarm(options) + def athena_handler(options): """Handle Athena operations""" @@ -315,16 +322,6 @@ def run_command(args=None, **kwargs): return helpers.run_command(args, **kwargs) -def continue_prompt(): - """Continue prompt used before applying Terraform plans""" - required_responses = {'yes', 'no'} - response = '' - while response not in required_responses: - response = raw_input('\nWould you like to continue? (yes or no): ') - if response == 'no': - sys.exit(0) - - def tf_runner(**kwargs): """Terraform wrapper to build StreamAlert infrastructure. @@ -360,7 +357,8 @@ def tf_runner(**kwargs): if not run_command(tf_command): return False - continue_prompt() + if not continue_prompt(): + sys.exit(0) if action == 'destroy': LOGGER_CLI.info('Destroying infrastructure') @@ -649,3 +647,40 @@ def configure_output(options): 'output configuration for service \'%s\'', props['descriptor'].value, options.service) + + +def _toggle_metrics(options): + """Enable or disable logging CloudWatch metrics + + Args: + options (argparser): Contains boolean necessary for toggling metrics + """ + CONFIG.toggle_metrics(options.enable_metrics, options.clusters, options.functions) + + +def _create_alarm(options): + """Create a new CloudWatch alarm for the given metric + + Args: + options (argparser): Contains all of the necessary info for configuring + a CloudWatch alarm + """ + # Perform safety check for max total evaluation period. This logic cannot + # be performed by argparse so must be performed now. + seconds_in_day = 86400 + if options.period * options.evaluation_periods > seconds_in_day: + LOGGER_CLI.error('The product of the value for period multiplied by the ' + 'value for evaluation periods cannot exceed 86,400. 86,400 ' + 'is the number of seconds in one day and an alarm\'s total ' + 'current evaluation period can be no longer than one day.') + return + + # Check to see if the user is specifying clusters when trying to create an + # alarm on an aggregate metric. Aggregate metrics encompass all clusters so + # specification of clusters doesn't have any real effect + if options.metric_target == 'aggregate' and options.clusters: + LOGGER_CLI.error('Specifying clusters when creating an alarm on an aggregate ' + 'metric has no effect. Please remove the -c/--clusters flag.') + return + + CONFIG.add_metric_alarm(vars(options)) diff --git a/stream_alert_cli/terraform_generate.py b/stream_alert_cli/terraform_generate.py index f37a63698..0b00c6dad 100644 --- a/stream_alert_cli/terraform_generate.py +++ b/stream_alert_cli/terraform_generate.py @@ -16,10 +16,12 @@ from collections import defaultdict import json import os +import string from stream_alert.shared import metrics from stream_alert_cli.logger import LOGGER_CLI +DEFAULT_SNS_MONITORING_TOPIC = 'stream_alert_monitoring' RESTRICTED_CLUSTER_NAMES = ('main', 'athena') @@ -160,10 +162,48 @@ def generate_main(**kwargs): infrastructure_config = config['global'].get('infrastructure') if infrastructure_config and 'monitoring' in infrastructure_config: if infrastructure_config['monitoring'].get('create_sns_topic'): - main_dict['resource']['aws_sns_topic']['stream_alert_monitoring'] = { - 'name': 'stream_alert_monitoring' + main_dict['resource']['aws_sns_topic'][DEFAULT_SNS_MONITORING_TOPIC] = { + 'name': DEFAULT_SNS_MONITORING_TOPIC } + # Add any global cloudwatch alarms to the main.tf + monitoring_config = config['global']['infrastructure'].get('monitoring') + if not monitoring_config: + return main_dict + + global_metrics = monitoring_config.get('metric_alarms') + if not global_metrics: + return main_dict + + topic_name = (DEFAULT_SNS_MONITORING_TOPIC if infrastructure_config + ['monitoring'].get('create_sns_topic') else + infrastructure_config['monitoring'].get('sns_topic_name')) + + sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( + region=config['global']['account']['region'], + account_id=config['global']['account']['aws_account_id'], + topic=topic_name + ) + + formatted_alarms = {} + # Add global metric alarms for the rule and alert processors + for func in metrics.FUNC_PREFIXES: + if not func in global_metrics: + continue + + for name, settings in global_metrics[func].iteritems(): + alarm_info = settings.copy() + alarm_info['alarm_name'] = name + alarm_info['namespace'] = 'StreamAlert' + alarm_info['alarm_actions'] = [sns_topic_arn] + # Terraform only allows certain characters in resource names, so strip the name + acceptable_chars = ''.join([string.digits, string.letters, '_-']) + name = filter(acceptable_chars.__contains__, name) + formatted_alarms['metric_alarm_{}'.format(name)] = alarm_info + + if formatted_alarms: + main_dict['resource']['aws_cloudwatch_metric_alarm'] = formatted_alarms + return main_dict @@ -216,8 +256,6 @@ def generate_stream_alert(cluster_name, cluster_dict, config): Returns: bool: Result of applying the stream_alert module """ - enable_metrics = config['global'].get('infrastructure', - {}).get('metrics', {}).get('enabled', False) account = config['global']['account'] modules = config['clusters'][cluster_name]['modules'] @@ -228,7 +266,8 @@ def generate_stream_alert(cluster_name, cluster_dict, config): 'prefix': account['prefix'], 'cluster': cluster_name, 'kms_key_arn': '${aws_kms_key.stream_alert_secrets.arn}', - 'rule_processor_enable_metrics': enable_metrics, + 'rule_processor_enable_metrics': modules['stream_alert'] \ + ['rule_processor'].get('enable_metrics', False), 'rule_processor_log_level': modules['stream_alert'] \ ['rule_processor'].get('log_level', 'info'), 'rule_processor_memory': modules['stream_alert']['rule_processor']['memory'], @@ -236,7 +275,8 @@ def generate_stream_alert(cluster_name, cluster_dict, config): 'rule_processor_version': modules['stream_alert']['rule_processor']['current_version'], 'rule_processor_config': '${var.rule_processor_config}', 'alert_processor_config': '${var.alert_processor_config}', - 'alert_processor_enable_metrics': enable_metrics, + 'alert_processor_enable_metrics': modules['stream_alert'] \ + ['alert_processor'].get('enable_metrics', False), 'alert_processor_log_level': modules['stream_alert'] \ ['alert_processor'].get('log_level', 'info'), 'alert_processor_memory': modules['stream_alert']['alert_processor']['memory'], @@ -281,36 +321,33 @@ def generate_stream_alert(cluster_name, cluster_dict, config): return True -def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): - """Add the CloudWatch Metric Filters module to the Terraform cluster dict. + +def generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config): + """Add the CloudWatch Metric Filters information to the Terraform cluster dict. Args: cluster_name (str): The name of the currently generating cluster cluster_dict (defaultdict): The dict containing all Terraform config for a given cluster. config (dict): The loaded config from the 'conf/' directory - - Returns: - bool: Result of applying the cloudwatch metric filters to the stream_alert module """ - enable_metrics = config['global'].get('infrastructure', - {}).get('metrics', {}).get('enabled', False) - - # Do not add any metric filters if metrics are disabled - if not enable_metrics: - return + stream_alert_config = config['clusters'][cluster_name]['modules']['stream_alert'] current_metrics = metrics.MetricLogger.get_available_metrics() # Add metric filters for the rule and alert processor - # The funcs dict acts as a simple map to a human-readable name - funcs = {metrics.ALERT_PROCESSOR_NAME: 'AlertProcessor', - metrics.RULE_PROCESSOR_NAME: 'RuleProcessor'} - - for func in funcs: + for func, metric_prefix in metrics.FUNC_PREFIXES.iteritems(): if func not in current_metrics: continue - metric_prefix = funcs[func] + if func not in stream_alert_config: + LOGGER_CLI.error('Function for metrics \'%s\' is not defined in stream alert config. ' + 'Options are: %s', func, + ', '.join('\'{}\''.format(key) for key in stream_alert_config)) + continue + + if not stream_alert_config[func].get('enable_metrics'): + continue + filter_pattern_idx, filter_value_idx = 0, 1 # Add filters for the cluster and aggregate @@ -333,6 +370,80 @@ def generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config): ['{}_metric_filters'.format(func)] = filters +def _format_metric_alarm(name, alarm_settings): + """Helper function to format a metric alarm as a comma-separated string + + Args: + name (str): The name of the alarm to create + alarm_info (dict): All other settings for this alarm (threshold, etc) + function (str): The respective function this alarm is being created for. + This is the RuleProcessor or AlertProcessor + cluster (str): The cluster that this metric is related to + + Returns: + str: formatted and comma-separated string containing alarm settings + """ + alarm_info = alarm_settings.copy() + # The alarm description and name can potentially have commas so remove them + alarm_info['alarm_description'] = alarm_info['alarm_description'].replace(',', '') + + attributes = list(alarm_info) + attributes.sort() + sorted_values = [str(alarm_info[attribute]) if alarm_info[attribute] + else '' for attribute in attributes] + + sorted_values.insert(0, name.replace(',', '')) + + return ','.join(sorted_values) + + +def generate_cloudwatch_metric_alarms(cluster_name, cluster_dict, config): + """Add the CloudWatch Metric Alarms information to the Terraform cluster dict. + + Args: + cluster_name (str): The name of the currently generating cluster + cluster_dict (defaultdict): The dict containing all Terraform config for a given cluster. + config (dict): The loaded config from the 'conf/' directory + """ + infrastructure_config = config['global'].get('infrastructure') + + if not (infrastructure_config and 'monitoring' in infrastructure_config): + LOGGER_CLI.error('Invalid config: Make sure you declare global infrastructure options!') + return + + topic_name = (DEFAULT_SNS_MONITORING_TOPIC if infrastructure_config + ['monitoring'].get('create_sns_topic') else + infrastructure_config['monitoring'].get('sns_topic_name')) + + sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( + region=config['global']['account']['region'], + account_id=config['global']['account']['aws_account_id'], + topic=topic_name + ) + + cluster_dict['module']['stream_alert_{}'.format(cluster_name)] \ + ['sns_topic_arn'] = sns_topic_arn + + stream_alert_config = config['clusters'][cluster_name]['modules']['stream_alert'] + + # Add cluster metric alarms for the rule and alert processors + formatted_alarms = [] + for func_config in stream_alert_config.values(): + if 'metric_alarms' not in func_config: + continue + + # TODO: update this logic to simply use a list of maps once Terraform fixes + # their support for this, instead of the comma-separated string this creates + metric_alarms = func_config['metric_alarms'] + for name, alarm_info in metric_alarms.iteritems(): + formatted_alarms.append( + _format_metric_alarm(name, alarm_info) + ) + + cluster_dict['module']['stream_alert_{}'.format(cluster_name)] \ + ['metric_alarms'] = formatted_alarms + + def generate_cloudwatch_monitoring(cluster_name, cluster_dict, config): """Add the CloudWatch Monitoring module to the Terraform cluster dict. @@ -346,25 +457,21 @@ def generate_cloudwatch_monitoring(cluster_name, cluster_dict, config): """ prefix = config['global']['account']['prefix'] infrastructure_config = config['global'].get('infrastructure') - sns_topic_arn = None - if infrastructure_config and 'monitoring' in infrastructure_config: - if infrastructure_config['monitoring'].get('create_sns_topic'): - sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( - region=config['global']['account']['region'], - account_id=config['global']['account']['aws_account_id'], - topic='stream_alert_monitoring' - ) - elif infrastructure_config['monitoring'].get('sns_topic_name'): - sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( - region=config['global']['account']['region'], - account_id=config['global']['account']['aws_account_id'], - topic=infrastructure_config['monitoring']['sns_topic_name'] - ) - else: + if not (infrastructure_config and 'monitoring' in infrastructure_config): LOGGER_CLI.error('Invalid config: Make sure you declare global infrastructure options!') return False + topic_name = DEFAULT_SNS_MONITORING_TOPIC if infrastructure_config \ + ['monitoring'].get('create_sns_topic') else \ + infrastructure_config['monitoring'].get('sns_topic_name') + + sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( + region=config['global']['account']['region'], + account_id=config['global']['account']['aws_account_id'], + topic=topic_name + ) + lambda_functions = [ '{}_{}_streamalert_rule_processor'.format(prefix, cluster_name), '{}_{}_streamalert_alert_processor'.format(prefix, cluster_name) @@ -615,7 +722,9 @@ def generate_cluster(**kwargs): if not generate_stream_alert(cluster_name, cluster_dict, config): return - generate_cloudwatch_log_metrics(cluster_name, cluster_dict, config) + generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config) + + generate_cloudwatch_metric_alarms(cluster_name, cluster_dict, config) if modules['cloudwatch_monitoring']['enabled']: if not generate_cloudwatch_monitoring(cluster_name, cluster_dict, config): @@ -661,8 +770,6 @@ def generate_athena(config): """ athena_dict = infinitedict() athena_config = config['lambda']['athena_partition_refresh_config'] - enable_metrics = config['global'].get('infrastructure', - {}).get('metrics', {}).get('enabled', False) data_buckets = set() for refresh_type in athena_config['refresh_type']: @@ -679,11 +786,11 @@ def generate_athena(config): 'athena_data_buckets': list(data_buckets), 'refresh_interval': athena_config.get('refresh_interval', 'rate(10 minutes)'), 'current_version': athena_config['current_version'], - 'enable_metrics': enable_metrics, + 'enable_metrics': athena_config.get('enable_metrics', False), 'prefix': config['global']['account']['prefix'] } - if not enable_metrics: + if not athena_config.get('enable_metrics', False): return athena_dict # Check to see if there are any metrics configured for the athena function diff --git a/stream_alert_cli/test.py b/stream_alert_cli/test.py index c083ba5f1..b20ff8505 100644 --- a/stream_alert_cli/test.py +++ b/stream_alert_cli/test.py @@ -191,7 +191,7 @@ def _run_rule_tests(self, rule_name, test_record, formatted_record, print_header # Print rule name for section header, but only if we get # to a point where there is a record to actually be tested. - # This avoids potentialy blank sections + # This avoids potentially blank sections if print_header_line and (alerts or self.print_output): print '\n{}'.format(rule_name) diff --git a/terraform/modules/tf_stream_alert/main.tf b/terraform/modules/tf_stream_alert/main.tf index 8d2527de9..cd41bbb69 100644 --- a/terraform/modules/tf_stream_alert/main.tf +++ b/terraform/modules/tf_stream_alert/main.tf @@ -180,3 +180,22 @@ resource "aws_cloudwatch_log_metric_filter" "alert_processor_cw_metric_filters" value = "${element(split(",", var.alert_processor_metric_filters[count.index]), 2)}" } } + +// CloudWatch metric alarms that are created per-cluster +// The split list is our way around poor tf support for lists of maps and is made up of: +// , , , , +// , , , +// TODO: update this logic to simply use a variable that is a list of maps once Terraform fixes this +resource "aws_cloudwatch_metric_alarm" "cw_metric_alarms" { + count = "${length(var.metric_alarms)}" + alarm_name = "${element(split(",", var.metric_alarms[count.index]), 0)}" + alarm_description = "${element(split(",", var.metric_alarms[count.index]), 1)}" + comparison_operator = "${element(split(",", var.metric_alarms[count.index]), 2)}" + evaluation_periods = "${element(split(",", var.metric_alarms[count.index]), 3)}" + metric_name = "${element(split(",", var.metric_alarms[count.index]), 4)}" + period = "${element(split(",", var.metric_alarms[count.index]), 5)}" + statistic = "${element(split(",", var.metric_alarms[count.index]), 6)}" + threshold = "${element(split(",", var.metric_alarms[count.index]), 7)}" + namespace = "${var.namespace}" + alarm_actions = ["${var.sns_topic_arn}"] +} diff --git a/terraform/modules/tf_stream_alert/variables.tf b/terraform/modules/tf_stream_alert/variables.tf index 9b06cd835..ec8e832c3 100644 --- a/terraform/modules/tf_stream_alert/variables.tf +++ b/terraform/modules/tf_stream_alert/variables.tf @@ -97,7 +97,14 @@ variable "alert_processor_metric_filters" { default = [] } +variable "metric_alarms" { + type = "list" + default = [] +} + variable "namespace" { type = "string" default = "StreamAlert" } + +variable "sns_topic_arn" {} diff --git a/tests/unit/stream_alert_cli/test_terraform_generate.py b/tests/unit/stream_alert_cli/test_terraform_generate.py index 58dd8b832..a3966929b 100644 --- a/tests/unit/stream_alert_cli/test_terraform_generate.py +++ b/tests/unit/stream_alert_cli/test_terraform_generate.py @@ -585,14 +585,15 @@ def test_generate_athena(self): 'prefix': 'unit-testing' }, 'infrastructure': { - 'metrics': { - 'enabled': True + 'monitoring': { + 'create_sns_topic': True } } }, 'lambda': { 'athena_partition_refresh_config': { 'enabled': True, + 'enable_metrics': True, 'current_version': '$LATEST', 'refresh_type': { 'repair_hive_table': {