Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create alert merger which dispatches alerts from Dynamo to alert processors #642

Merged
merged 7 commits into from
Mar 21, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/lambda.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"alert_merger_config": {
"concurrency_limit": 1,
"current_version": "$LATEST",
"enable_metrics": true,
"handler": "stream_alert.alert_merger.main.handler",
"log_level": "info",
"log_retention_days": 14,
Expand Down
5 changes: 4 additions & 1 deletion stream_alert/alert_merger/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import time

from stream_alert.alert_merger import LOGGER
from stream_alert.shared.metrics import ALERT_MERGER_NAME, MetricLogger

import boto3
from boto3.dynamodb.conditions import Attr, Key
Expand Down Expand Up @@ -132,8 +133,10 @@ def __init__(self):

def _dispatch_alert(self, alert):
"""Dispatch all alerts which need to be sent to the rule processor."""
this_attempt_num = alert.get('Attempts', 0) + 1
LOGGER.info('Dispatching alert %s to %s (attempt %d)',
alert['AlertID'], self.alert_proc, alert.get('Attempts', 0) + 1)
alert['AlertID'], self.alert_proc, this_attempt_num)
MetricLogger.log_metric(ALERT_MERGER_NAME, MetricLogger.ALERT_ATTEMPTS, this_attempt_num)

self.lambda_client.invoke(
FunctionName=self.alert_proc,
Expand Down
14 changes: 12 additions & 2 deletions stream_alert/shared/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os

from stream_alert.shared import (
ALERT_MERGER_NAME,
ALERT_PROCESSOR_NAME,
ATHENA_PARTITION_REFRESH_NAME,
LOGGER,
Expand All @@ -27,7 +28,10 @@
# The FUNC_PREFIXES dict acts as a simple map to a human-readable name
# Add ATHENA_PARTITION_REFRESH_NAME: 'AthenaPartitionRefresh', to the
# below when metrics are supported there
FUNC_PREFIXES = {RULE_PROCESSOR_NAME: 'RuleProcessor'}
FUNC_PREFIXES = {
ALERT_MERGER_NAME: 'AlertMerger',
RULE_PROCESSOR_NAME: 'RuleProcessor'
}

try:
ENABLE_METRICS = bool(int(os.environ.get('ENABLE_METRICS', 0)))
Expand All @@ -47,7 +51,7 @@ class MetricLogger(object):
accessing properties and avoids doing dict lookups a ton.
"""

# Constant metric names used for CloudWatch
# Rule Processor metric names
FAILED_PARSES = 'FailedParses'
S3_DOWNLOAD_TIME = 'S3DownloadTime'
TOTAL_PROCESSED_SIZE = 'TotalProcessedSize'
Expand All @@ -60,6 +64,9 @@ class MetricLogger(object):
FIREHOSE_FAILED_RECORDS = 'FirehoseFailedRecords'
NORMALIZED_RECORDS = 'NormalizedRecords'

# Alert Merger metric names
ALERT_ATTEMPTS = 'AlertAttempts'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉


_default_filter = '{{ $.metric_name = "{}" }}'
_default_value_lookup = '$.metric_value'

Expand All @@ -70,6 +77,9 @@ class MetricLogger(object):
# If additional metric logging is added that does not conform to this default
# configuration, new filters & lookups should be created to handle them as well.
_available_metrics = {
ALERT_MERGER_NAME: {
ALERT_ATTEMPTS: (_default_filter.format(ALERT_ATTEMPTS), _default_value_lookup)
},
ALERT_PROCESSOR_NAME: {}, # Placeholder for future alert processor metrics
ATHENA_PARTITION_REFRESH_NAME: {}, # Placeholder for future athena processor metrics
RULE_PROCESSOR_NAME: {
Expand Down
2 changes: 1 addition & 1 deletion stream_alert_cli/terraform/alert_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def generate_alert_merger(config):
ALERT_MERGER_NAME, config, {
'ALERTS_TABLE': '{}_streamalert_alerts'.format(prefix),
'ALERT_PROCESSOR': '{}_streamalert_alert_processor'.format(prefix),
'ALERT_PROCESSOR_TIMEOUT_SEC': config['lambda']['alert_processor_config']['timeout']
'ALERT_PROCESSOR_TIMEOUT_SEC': config['lambda']['alert_processor_config']['timeout'],
}
)

Expand Down
24 changes: 22 additions & 2 deletions stream_alert_cli/terraform/lambda_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
limitations under the License.
"""
from stream_alert import shared
from stream_alert.shared import metrics
from stream_alert_cli.terraform.common import monitoring_topic_arn


Expand Down Expand Up @@ -48,6 +49,22 @@ def _tf_metric_alarms(lambda_config, sns_arn):
return result


def _tf_metric_filters(lambda_config, function_name):
"""Compute metric filter Terraform configuration from the Lambda config."""
if not lambda_config.get('enable_metrics'):
return {}

# Create a metric filter for each custom metric associated with this function.
metric_filters = []
function_metrics = metrics.MetricLogger.get_available_metrics()[function_name]
for metric, settings in function_metrics.items():
metric_name = '{}-{}'.format(metrics.FUNC_PREFIXES[function_name], metric)
filter_pattern, filter_value = settings
metric_filters.append('{},{},{}'.format(metric_name, filter_pattern, filter_value))

return {'log_metric_filters': metric_filters}


def _tf_vpc_config(lambda_config):
"""Compute VPC configuration from the Lambda config."""
result = {}
Expand All @@ -70,7 +87,7 @@ def generate_lambda(function_name, config, environment=None):
function_name (str): Name of the Lambda function (e.g. 'alert_processor')
config (dict): Parsed config from conf/
environment (dict): Optional environment variables to specify.
LOGGER_LEVEL is included automatically.
ENABLE_METRICS and LOGGER_LEVEL are included automatically.

Example Lambda config:
{
Expand Down Expand Up @@ -115,6 +132,8 @@ def generate_lambda(function_name, config, environment=None):

# Add logger level to any custom environment variables
environment_variables = {
# Convert True/False to "1" or "0", respectively
'ENABLE_METRICS': str(int(lambda_config.get('enable_metrics', False))),
'LOGGER_LEVEL': lambda_config.get('log_level', 'info')
}
if environment:
Expand All @@ -139,8 +158,9 @@ def generate_lambda(function_name, config, environment=None):
if key in lambda_config:
lambda_module[key] = lambda_config[key]

# Add metric alarms to the Lambda module definition
# Add metric alarms and filters to the Lambda module definition
lambda_module.update(_tf_metric_alarms(lambda_config, monitoring_topic_arn(config)))
lambda_module.update(_tf_metric_filters(lambda_config, function_name))

# Add VPC config to the Lambda module definition
lambda_module.update(_tf_vpc_config(lambda_config))
Expand Down
4 changes: 0 additions & 4 deletions stream_alert_cli/terraform/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ def generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config):
continue

if func not in stream_alert_config:
LOGGER_CLI.error(
'Function for metrics \'%s\' is not defined in stream alert config. '
'Options are: %s', func, ', '.join(
'\'{}\''.format(key) for key in stream_alert_config))
continue

if not stream_alert_config[func].get('enable_metrics'):
Expand Down
16 changes: 15 additions & 1 deletion terraform/modules/tf_lambda/cloudwatch.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ resource "aws_cloudwatch_event_target" "invoke_lambda_no_vpc" {
arn = "${aws_lambda_alias.alias_no_vpc.arn}"
}

// CloudWatch log group with configurable retention and tagging
// CloudWatch log group with configurable retention, tagging, and metric filters

resource "aws_cloudwatch_log_group" "lambda_log_group" {
count = "${var.enabled}"
Expand All @@ -33,6 +33,20 @@ resource "aws_cloudwatch_log_group" "lambda_log_group" {
}
}

// The split list is made up of: <filter_name>, <filter_pattern>, <value>
resource "aws_cloudwatch_log_metric_filter" "rule_processor_cw_metric_filters" {
count = "${length(var.log_metric_filters)}"
name = "${element(split(",", var.log_metric_filters[count.index]), 0)}"
pattern = "${element(split(",", var.log_metric_filters[count.index]), 1)}"
log_group_name = "${aws_cloudwatch_log_group.lambda_log_group.name}"

metric_transformation {
name = "${element(split(",", var.log_metric_filters[count.index]), 0)}"
namespace = "${var.log_metric_filter_namespace}"
value = "${element(split(",", var.log_metric_filters[count.index]), 2)}"
}
}

// Generic CloudWatch metric alarms related to this function

resource "aws_cloudwatch_metric_alarm" "lambda_invocation_errors" {
Expand Down
11 changes: 11 additions & 0 deletions terraform/modules/tf_lambda/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ variable "log_retention_days" {
description = "CloudWatch logs for the Lambda function will be retained for this many days"
}

variable "log_metric_filter_namespace" {
default = "StreamAlert"
description = "Namespace for metrics generated from metric filters"
}

variable "log_metric_filters" {
type = "list"
default = []
description = "Metric filters applied to the log group. Each filter should be in the format \"filter_name,filter_pattern,value\""
}

// ***** CloudWatch metric alarms *****

variable "alarm_actions" {
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/stream_alert_cli/terraform/test_alert_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_generate_all_options(self):
'description': 'StreamAlert Alert Processor',
'environment_variables': {
'ALERTS_TABLE': 'unit-testing_streamalert_alerts',
'ENABLE_METRICS': '0',
'LOGGER_LEVEL': 'info'
},
'errors_alarm_enabled': True,
Expand Down Expand Up @@ -110,6 +111,7 @@ def test_generate_minimal_options(self):
'description': 'StreamAlert Alert Processor',
'environment_variables': {
'ALERTS_TABLE': 'unit-testing_streamalert_alerts',
'ENABLE_METRICS': '0',
'LOGGER_LEVEL': 'info'
},
'function_name': 'unit-testing_streamalert_alert_processor',
Expand Down
9 changes: 0 additions & 9 deletions tests/unit/stream_alert_shared/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,6 @@ def setup(self):
# Force reload the metrics package to trigger env var loading
reload(shared.metrics)

@patch('logging.Logger.error')
def test_invalid_metric_function(self, log_mock):
"""Metrics - Invalid Function Name"""
shared.metrics.MetricLogger.log_metric('rule_procesor', '', '')

log_mock.assert_called_with(
'Function \'%s\' not defined in available metrics. '
'Options are: %s', 'rule_procesor', '\'rule_processor\'')

@patch('logging.Logger.error')
def test_invalid_metric_name(self, log_mock):
"""Metrics - Invalid Metric Name"""
Expand Down