Skip to content

Commit

Permalink
Consolidate to a single alert processor (instead of clustered) (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
austinbyers authored and ryandeivert committed Mar 14, 2018
1 parent fc2ed12 commit 9ec0839
Show file tree
Hide file tree
Showing 40 changed files with 853 additions and 752 deletions.
10 changes: 2 additions & 8 deletions conf/clusters/prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,10 @@
}
},
"kinesis_events": {
"enabled": true,
"batch_size": 100
"batch_size": 100,
"enabled": true
},
"stream_alert": {
"alert_processor": {
"current_version": "$LATEST",
"log_level": "info",
"memory": 128,
"timeout": 60
},
"rule_processor": {
"current_version": "$LATEST",
"enable_metrics": true,
Expand Down
24 changes: 23 additions & 1 deletion conf/lambda.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
{
"alert_processor_config": {
"current_version": "$LATEST",
"handler": "stream_alert.alert_processor.main.handler",
"log_level": "info",
"log_retention_days": 14,
"memory": 128,
"metric_alarms": {
"enabled": true,
"errors_alarm_evaluation_periods": 1,
"errors_alarm_period_secs": 120,
"errors_alarm_threshold": 0,
"throttles_alarm_evaluation_periods": 1,
"throttles_alarm_period_secs": 120,
"throttles_alarm_threshold": 0
},
"outputs": {
"aws-lambda": [],
"aws-s3": []
},
"source_bucket": "PREFIX_GOES_HERE.streamalert.source",
"source_current_hash": "<auto_generated>",
"source_object_key": "<auto_generated>",
"third_party_libraries": []
"third_party_libraries": [],
"timeout": 60,
"vpc_config": {
"security_group_ids": [],
"subnet_ids": []
}
},
"rule_processor_config": {
"handler": "stream_alert.rule_processor.main.handler",
Expand Down
2 changes: 1 addition & 1 deletion conf/logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -1496,4 +1496,4 @@
}
}
}
}
}
14 changes: 7 additions & 7 deletions conf/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
}
},
"stream_alert_app": {
"prefix_cluster_duo_auth_sm-app-name_app": {
"prefix_cluster_box_admin_events_sm-app-name_app": {
"logs": [
"duo"
"box"
]
},
"prefix_cluster_onelogin-events-app-name_app": {
"prefix_cluster_duo_admin_sm-app-name_app": {
"logs": [
"onelogin"
"duo"
]
},
"prefix_cluster_duo_admin_sm-app-name_app": {
"prefix_cluster_duo_auth_sm-app-name_app": {
"logs": [
"duo"
]
Expand All @@ -37,9 +37,9 @@
"gsuite"
]
},
"prefix_cluster_box_admin_events_sm-app-name_app": {
"prefix_cluster_onelogin-events-app-name_app": {
"logs": [
"box"
"onelogin"
]
}
}
Expand Down
7 changes: 3 additions & 4 deletions stream_alert/rule_processor/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
limitations under the License.
"""
import json
import os

import boto3
from botocore.exceptions import ClientError
Expand All @@ -31,10 +32,8 @@ def __init__(self, env):
env (dict): loaded dictionary containing environment information
"""
self.env = env
self.client_lambda = boto3.client('lambda',
region_name=self.env['lambda_region'])
self.function = self.env['lambda_function_name'].replace(
'_streamalert_rule_processor', '_streamalert_alert_processor')
self.client_lambda = boto3.client('lambda', region_name=self.env['lambda_region'])
self.function = os.environ['ALERT_PROCESSOR']

def sink(self, alerts):
"""Sink triggered alerts from the StreamRules engine.
Expand Down
3 changes: 1 addition & 2 deletions stream_alert/shared/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
# The FUNC_PREFIXES dict acts as a simple map to a human-readable name
# Add ATHENA_PARTITION_REFRESH_NAME: 'AthenaPartitionRefresh', to the
# below when metrics are supported there
FUNC_PREFIXES = {ALERT_PROCESSOR_NAME: 'AlertProcessor',
RULE_PROCESSOR_NAME: 'RuleProcessor'}
FUNC_PREFIXES = {RULE_PROCESSOR_NAME: 'RuleProcessor'}

try:
ENABLE_METRICS = bool(int(os.environ.get('ENABLE_METRICS', 0)))
Expand Down
40 changes: 22 additions & 18 deletions stream_alert_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,21 @@ def toggle_metrics(self, enabled, clusters, lambda_functions):
metrics on (rule, alert, or athena)
"""
for function in lambda_functions:
if function == metrics.ATHENA_PARTITION_REFRESH_NAME:
if function == metrics.ALERT_PROCESSOR_NAME:
self.config['lambda']['alert_processor_config']['enable_metrics'] = enabled

elif function == metrics.ATHENA_PARTITION_REFRESH_NAME:
if 'athena_partition_refresh_config' in self.config['lambda']:
self.config['lambda']['athena_partition_refresh_config'] \
['enable_metrics'] = enabled
else:
LOGGER_CLI.error('No Athena configuration found; please initialize first.')
continue

for cluster in clusters:
self.config['clusters'][cluster]['modules']['stream_alert'] \
[function]['enable_metrics'] = enabled
else:
# Rule processor - toggle for each cluster
for cluster in clusters:
self.config['clusters'][cluster]['modules']['stream_alert'] \
[function]['enable_metrics'] = enabled

self.write()

Expand Down Expand Up @@ -240,7 +244,7 @@ def _add_metric_alarm_per_cluster(self, alarm_info, function_name):
cluster.upper())

new_alarms = self._add_metric_alarm_config(alarm_settings, metric_alarms)
if new_alarms != False:
if new_alarms is not False:
function_config['metric_alarms'] = new_alarms
LOGGER_CLI.info('Successfully added \'%s\' metric alarm for the \'%s\' '
'function to \'conf/clusters/%s.json\'.',
Expand All @@ -259,7 +263,7 @@ def _alarm_exists(self, alarm_name):
message = ('CloudWatch metric alarm names must be unique '
'within each AWS account. Please remove this alarm '
'so it can be updated or choose another name.')
funcs = {metrics.ALERT_PROCESSOR_NAME, metrics.RULE_PROCESSOR_NAME}
funcs = {metrics.RULE_PROCESSOR_NAME}
for func in funcs:
for cluster in self.config['clusters']:
func_alarms = (
Expand All @@ -279,8 +283,8 @@ def _alarm_exists(self, alarm_name):
if not metric_alarms:
return False

# Check for athena metric alarms also, which are save in the global config
funcs.add(metrics.ATHENA_PARTITION_REFRESH_NAME)
# Check for functions saved in the global config.
funcs.update({metrics.ALERT_PROCESSOR_NAME, metrics.ATHENA_PARTITION_REFRESH_NAME})

for func in funcs:
global_func_alarms = global_config['metric_alarms'].get(func, {})
Expand Down Expand Up @@ -313,15 +317,15 @@ def add_metric_alarm(self, alarm_info):

# Do not continue if the user is trying to apply a metric alarm for an athena
# metric to a specific cluster (since the athena function operates on all clusters)
if (alarm_info['metric_target'] != 'aggregate'
and metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME):
LOGGER_CLI.error('Metrics for the athena function can only be applied '
if (alarm_info['metric_target'] != 'aggregate' and metric_function in {
metrics.ALERT_PROCESSOR_NAME, metrics.ATHENA_PARTITION_REFRESH_NAME}):
LOGGER_CLI.error('Metrics for the athena and alert functions can only be applied '
'to an aggregate metric target, not on a per-cluster basis.')
return

# If the metric is related to either the rule processor or alert processor, we should
# If the metric is related to the rule processor, we should
# check to see if any cluster has metrics enabled for that function before continuing
if (metric_function in {metrics.ALERT_PROCESSOR_NAME, metrics.RULE_PROCESSOR_NAME} and
if (metric_function == metrics.RULE_PROCESSOR_NAME and
not any(self.config['clusters'][cluster]['modules']['stream_alert'][metric_function]
.get('enable_metrics') for cluster in self.config['clusters'])):
prompt = ('Metrics are not currently enabled for the \'{}\' function '
Expand Down Expand Up @@ -353,8 +357,8 @@ def add_metric_alarm(self, alarm_info):
return

# Add metric alarms for the aggregate metrics - these are added to the global config
if (alarm_info['metric_target'] == 'aggregate'
or metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME):
if (alarm_info['metric_target'] == 'aggregate' or metric_function in {
metrics.ALERT_PROCESSOR_NAME, metrics.ATHENA_PARTITION_REFRESH_NAME}):
global_config = self.config['global']['infrastructure']['monitoring']

metric_alarms = global_config.get('metric_alarms', {})
Expand All @@ -371,7 +375,7 @@ def add_metric_alarm(self, alarm_info):
alarm_info['metric_name'])

new_alarms = self._add_metric_alarm_config(alarm_settings, metric_alarms)
if new_alarms != False:
if new_alarms is not False:
global_config['metric_alarms'][metric_function] = new_alarms
LOGGER_CLI.info('Successfully added \'%s\' metric alarm to '
'\'conf/global.json\'.', alarm_settings['alarm_name'])
Expand Down Expand Up @@ -543,7 +547,7 @@ def _config_reader(self, key, file_path, **kwargs):
else:
# For certain log types (csv), the order of the schema
# must be retained. By loading as an OrderedDict,
# the configuration is gauaranteed to keep its order.
# the configuration is guaranteed to keep its order.
if key == 'logs':
self.config[key] = json.load(data, object_pairs_hook=OrderedDict)
else:
Expand Down
2 changes: 1 addition & 1 deletion stream_alert_cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ def get_context_from_config(cluster, config):
prefix = config['global']['account']['prefix']
account = config['global']['account']['aws_account_id']
region = config['global']['account']['region']
function_name = '{}_{}_streamalert_alert_processor'.format(prefix, cluster)
function_name = '{}_streamalert_alert_processor'.format(prefix)
arn = 'arn:aws:lambda:{}:{}:function:{}:testing'.format(
region, account, function_name)

Expand Down
4 changes: 2 additions & 2 deletions stream_alert_cli/manage_lambda/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _publish_version(packages, config, clusters):
Returns:
bool: Result of Lambda version publishing
"""
global_packages = {'athena_partition_refresh', 'threat_intel_downloader'}
global_packages = {'alert_processor', 'athena_partition_refresh', 'threat_intel_downloader'}

for package in packages:
if package.package_name in global_packages:
Expand Down Expand Up @@ -67,7 +67,7 @@ def _create_and_upload(function_name, config, cluster=None):
package_mapping = {
'alert': PackageMap(
stream_alert_packages.AlertProcessorPackage,
{'module.stream_alert_{}'.format(cluster) for cluster in clusters},
{'module.alert_processor_lambda'},
True),
'apps': PackageMap(
stream_alert_packages.AppIntegrationPackage,
Expand Down
120 changes: 97 additions & 23 deletions stream_alert_cli/manage_lambda/rollback.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,85 @@
limitations under the License.
"""
from stream_alert_cli import helpers
from stream_alert_cli.logger import LOGGER_CLI
from stream_alert_cli.terraform.generate import terraform_generate


def _decrement_version(lambda_config):
"""Decrement the Lambda version, if possible.
Args:
lambda_config (dict): Lambda function config with 'current_version'
Returns:
True if the version was changed, False otherwise
"""
current_version = lambda_config['current_version']
if current_version == '$LATEST':
return False

int_version = int(current_version)
if int_version <= 1:
return False

lambda_config['current_version'] = int_version - 1
return True


def _try_decrement_version(lambda_config, function_name):
"""Log a warning if the lambda version cannot be rolled back."""
changed = _decrement_version(lambda_config)
if not changed:
LOGGER_CLI.warn('%s cannot be rolled back from version %s',
function_name, str(lambda_config['current_version']))
return changed


def _rollback_alert(config):
"""Decrement the current_version for the alert processor."""
lambda_config = config['lambda']['alert_processor_config']
if _try_decrement_version(lambda_config, 'alert_processor'):
return ['module.alert_processor_lambda']


def _rollback_apps(config, clusters):
"""Decrement the current_version for all of the apps functions in the given clusters."""
tf_targets = []

for cluster in clusters:
apps_config = config['clusters'][cluster]['modules'].get('stream_alert_apps', {})
for lambda_name, lambda_config in apps_config.iteritems():
clustered_name = '{}_{}'.format(lambda_name, cluster)
if _try_decrement_version(lambda_config, clustered_name):
tf_targets.append('module.{}'.format(clustered_name))

return tf_targets


def _rollback_athena(config):
"""Decrement the current_version for the Athena Partition Refresh function."""
lambda_config = config['lambda'].get('athena_partition_refresh_config')
if lambda_config and _try_decrement_version(lambda_config, 'athena_partition_refresh'):
return['module.stream_alert_athena']


def _rollback_downloader(config):
"""Decrement the current_version for the Threat Intel Downloader function."""
lambda_config = config['lambda'].get('threat_intel_downloader_config')
if lambda_config and _try_decrement_version(lambda_config, 'threat_intel_downloader_config'):
return['module.threat_intel_downloader']


def _rollback_rule(config, clusters):
"""Decrement the current_version for the Rule Processor in each of the given clusters"""
tf_targets = []
for cluster in clusters:
lambda_config = config['clusters'][cluster]['modules']['stream_alert']['rule_processor']
if _try_decrement_version(lambda_config, 'rule_processor_{}'.format(cluster)):
tf_targets.append('module.stream_alert_{}'.format(cluster))
return tf_targets


def rollback(options, config):
"""Rollback the current production AWS Lambda version by 1
Expand All @@ -25,32 +101,30 @@ def rollback(options, config):
Only rollsback if published version is greater than 1
"""
clusters = options.clusters or config.clusters()
rollback_all = 'all' in options.processor
tf_targets = []

if 'all' in options.processor:
lambda_functions = {'rule_processor', 'alert_processor', 'athena_partition_refresh'}
else:
lambda_functions = {
'{}_processor'.format(proc)
for proc in options.processor if proc != 'athena'
}
if 'athena' in options.processor:
lambda_functions.add('athena_partition_refresh')
if rollback_all or 'alert' in options.processor:
tf_targets.extend(_rollback_alert(config) or [])

for cluster in clusters:
for lambda_function in lambda_functions:
stream_alert_key = config['clusters'][cluster]['modules']['stream_alert']
current_vers = stream_alert_key[lambda_function]['current_version']
if current_vers != '$LATEST':
current_vers = int(current_vers)
if current_vers > 1:
new_vers = current_vers - 1
config['clusters'][cluster]['modules']['stream_alert'][lambda_function][
'current_version'] = new_vers
config.write()

targets = ['module.stream_alert_{}'.format(x) for x in config.clusters()]
if rollback_all or 'apps' in options.processor:
tf_targets.extend(_rollback_apps(config, clusters) or [])

if rollback_all or 'athena' in options.processor:
tf_targets.extend(_rollback_athena(config) or [])

if rollback_all or 'rule' in options.processor:
tf_targets.extend(_rollback_rule(config, clusters) or [])

if rollback_all or 'threat_intel_downloader' in options.processor:
tf_targets.extend(_rollback_downloader(config) or [])

if not tf_targets: # No changes made
return

config.write()

if not terraform_generate(config=config):
return

helpers.tf_runner(targets=targets)
helpers.tf_runner(targets=sorted(tf_targets))
Loading

0 comments on commit 9ec0839

Please sign in to comment.