Skip to content

Commit

Permalink
Merge pull request #520 from airbnb/jacknaglieri-deploy-by-cluster
Browse files Browse the repository at this point in the history
Deploy by Cluster, Batch Size Param, and More
  • Loading branch information
jacknagz authored Dec 12, 2017
2 parents 112d937 + 7c97721 commit ab8e68a
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 268 deletions.
14 changes: 14 additions & 0 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,11 @@ def _add_default_lambda_args(lambda_parser):
action=UniqueSetAction,
required=True)

lambda_parser.add_argument(
'--clusters',
help=ARGPARSE_SUPPRESS,
nargs='+')

# Allow verbose output for the CLI with the --debug option
lambda_parser.add_argument('--debug', action='store_true', help=ARGPARSE_SUPPRESS)

Expand All @@ -1002,6 +1007,8 @@ def _add_terraform_subparser(subparsers):
--target The Terraform module name to apply.
Valid options: stream_alert, kinesis, kinesis_events,
cloudtrail, monitoring, and s3_events.
--clusters The StreamAlert cluster(s) to apply to.
Examples:
manage.py terraform init
Expand Down Expand Up @@ -1041,6 +1048,13 @@ def _add_terraform_subparser(subparsers):
help=ARGPARSE_SUPPRESS,
nargs='+')

tf_parser.add_argument(
'--clusters',
action=UniqueSetAction,
default=set(),
help=ARGPARSE_SUPPRESS,
nargs='+')

tf_parser.add_argument('--debug', action='store_true', help=ARGPARSE_SUPPRESS)


Expand Down
97 changes: 47 additions & 50 deletions stream_alert_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
class CLIConfigError(Exception):
pass


class CLIConfig(object):
"""A class to load, modify, and display the StreamAlertCLI Config"""
DEFAULT_CONFIG_PATH = 'conf'
Expand Down Expand Up @@ -123,8 +124,11 @@ def set_prefix(self, prefix):
'alert_processor_config']['source_bucket'].replace('PREFIX_GOES_HERE', prefix)
self.config['lambda']['rule_processor_config']['source_bucket'] = self.config['lambda'][
'rule_processor_config']['source_bucket'].replace('PREFIX_GOES_HERE', prefix)
self.config['lambda']['stream_alert_apps_config']['source_bucket'] = self.config['lambda'][
'stream_alert_apps_config']['source_bucket'].replace('PREFIX_GOES_HERE', prefix)

if self.config['lambda'].get('stream_alert_apps_config'):
self.config['lambda']['stream_alert_apps_config']['source_bucket'] = self.config[
'lambda']['stream_alert_apps_config']['source_bucket'].replace(
'PREFIX_GOES_HERE', prefix)

if self.config['lambda'].get('threat_intel_downloader_config'):
self.config['lambda']['threat_intel_downloader_config']['source_bucket'] = \
Expand Down Expand Up @@ -187,8 +191,8 @@ def _add_metric_alarm_config(alarm_info, current_alarms):
omitted_keys = {'debug', 'alarm_name', 'command', 'clusters', 'metric_target'}

current_alarms[alarm_info['alarm_name']] = {
key: value for key, value in alarm_info.iteritems()
if key not in omitted_keys
key: value
for key, value in alarm_info.iteritems() if key not in omitted_keys
}

return current_alarms
Expand All @@ -205,14 +209,14 @@ def _add_metric_alarm_per_cluster(self, alarm_info, function_name):
"""
# If no clusters have been specified by the user, we can assume this alarm
# should be created for all available clusters, so fall back to that
clusters = (alarm_info['clusters'] if alarm_info['clusters'] else
list(self.config['clusters']))
clusters = (alarm_info['clusters']
if alarm_info['clusters'] else list(self.config['clusters']))

# Go over each of the clusters and see if enable_metrics == True and prompt
# the user to toggle metrics on if this is False
for cluster in clusters:
function_config = (self.config['clusters'][cluster]['modules']
['stream_alert'][function_name])
function_config = (
self.config['clusters'][cluster]['modules']['stream_alert'][function_name])

if not function_config.get('enable_metrics'):
prompt = ('Metrics are not currently enabled for the \'{}\' function '
Expand All @@ -223,7 +227,7 @@ def _add_metric_alarm_per_cluster(self, alarm_info, function_name):
self.toggle_metrics(True, [cluster], [function_name])

elif not continue_prompt(message='Would you still like to add this alarm '
'even though metrics are disabled?'):
'even though metrics are disabled?'):
continue

metric_alarms = function_config.get('metric_alarms', {})
Expand Down Expand Up @@ -258,12 +262,13 @@ def _alarm_exists(self, alarm_name):
funcs = {metrics.ALERT_PROCESSOR_NAME, metrics.RULE_PROCESSOR_NAME}
for func in funcs:
for cluster in self.config['clusters']:
func_alarms = (self.config['clusters'][cluster]['modules']
['stream_alert'][func].get('metric_alarms', {}))
func_alarms = (
self.config['clusters'][cluster]['modules']['stream_alert'][func].get(
'metric_alarms', {}))
if alarm_name in func_alarms:
LOGGER_CLI.error('An alarm with name \'%s\' already exists in the '
'\'conf/clusters/%s.json\' cluster. %s', alarm_name,
cluster, message)
'\'conf/clusters/%s.json\' cluster. %s', alarm_name, cluster,
message)
return True

global_config = self.config['global']['infrastructure'].get('monitoring')
Expand Down Expand Up @@ -301,23 +306,24 @@ def add_metric_alarm(self, alarm_info):
current_metrics = metrics.MetricLogger.get_available_metrics()

# Extract the function name this metric is associated with
metric_function = {metric: function for function in current_metrics
for metric in current_metrics[function]}[alarm_info['metric_name']]
metric_function = {
metric: function
for function in current_metrics for metric in current_metrics[function]
}[alarm_info['metric_name']]

# Do not continue if the user is trying to apply a metric alarm for an athena
# metric to a specific cluster (since the athena function operates on all clusters)
if (alarm_info['metric_target'] != 'aggregate' and
metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME):
if (alarm_info['metric_target'] != 'aggregate'
and metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME):
LOGGER_CLI.error('Metrics for the athena function can only be applied '
'to an aggregate metric target, not on a per-cluster basis.')
return

# If the metric is related to either the rule processor or alert processor, we should
# check to see if any cluster has metrics enabled for that function before continuing
if (metric_function in {metrics.ALERT_PROCESSOR_NAME, metrics.RULE_PROCESSOR_NAME} and
not any(self.config['clusters'][cluster]['modules']['stream_alert']
[metric_function].get('enable_metrics') for cluster in
self.config['clusters'])):
not any(self.config['clusters'][cluster]['modules']['stream_alert'][metric_function]
.get('enable_metrics') for cluster in self.config['clusters'])):
prompt = ('Metrics are not currently enabled for the \'{}\' function '
'within any cluster. Creating an alarm will have no effect '
'until metrics are enabled for this function in at least one '
Expand All @@ -343,12 +349,12 @@ def add_metric_alarm(self, alarm_info):
self.toggle_metrics(True, None, [metric_function])

elif not continue_prompt(message='Would you still like to add this alarm '
'even though metrics are disabled?'):
'even though metrics are disabled?'):
return

# Add metric alarms for the aggregate metrics - these are added to the global config
if (alarm_info['metric_target'] == 'aggregate' or
metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME):
if (alarm_info['metric_target'] == 'aggregate'
or metric_function == metrics.ATHENA_PARTITION_REFRESH_NAME):
global_config = self.config['global']['infrastructure']['monitoring']

metric_alarms = global_config.get('metric_alarms', {})
Expand Down Expand Up @@ -417,12 +423,11 @@ def add_app_integration(self, app_info):
app_info['current_version'] = '$LATEST'
local_config_keys.update({'log_level', 'current_version', 'type'})

apps_config[app_info['app_name']] = {key: app_info[key]
for key in local_config_keys}
apps_config[app_info['app_name']] = {key: app_info[key] for key in local_config_keys}
else:
apps_config[app_info['app_name']].update({key: app_info[key]
for key in local_config_keys})

apps_config[app_info['app_name']].update(
{key: app_info[key]
for key in local_config_keys})

cluster_config['modules']['stream_alert_apps'] = apps_config

Expand All @@ -433,8 +438,8 @@ def add_app_integration(self, app_info):
self.config['sources']['stream_alert_app'] = app_sources

LOGGER_CLI.info('Successfully added \'%s\' app integration to \'conf/clusters/%s.json\' '
'for service \'%s\'.', app_info['app_name'],
app_info['cluster'], app_info['type'])
'for service \'%s\'.', app_info['app_name'], app_info['cluster'],
app_info['type'])

self.write()

Expand Down Expand Up @@ -489,9 +494,7 @@ def add_threat_intel_downloader(self, ti_downloader_info):
'source_bucket': 'PREFIX_GOES_HERE.streamalert.source',
'source_current_hash': '<auto_generated>',
'source_object_key': '<auto_generated>',
'third_party_libraries': [
'requests'
],
'third_party_libraries': ['requests'],
'timeout': '120',
'table_rcu': 10,
'table_wcu': 10,
Expand Down Expand Up @@ -538,8 +541,7 @@ def _config_reader(self, key, file_path, **kwargs):
# must be retained. By loading as an OrderedDict,
# the configuration is gauaranteed to keep its order.
if key == 'logs':
self.config[key] = json.load(data,
object_pairs_hook=OrderedDict)
self.config[key] = json.load(data, object_pairs_hook=OrderedDict)
else:
self.config[key] = json.load(data)
except ValueError:
Expand All @@ -548,11 +550,12 @@ def _config_reader(self, key, file_path, **kwargs):
@staticmethod
def _config_writer(config, path, **kwargs):
with open(path, 'r+') as conf_file:
json.dump(config,
conf_file,
indent=2,
separators=(',', ': '),
sort_keys=kwargs.get('sort_keys', True))
json.dump(
config,
conf_file,
indent=2,
separators=(',', ': '),
sort_keys=kwargs.get('sort_keys', True))
conf_file.truncate()

def load(self):
Expand All @@ -574,19 +577,13 @@ def write(self):
"""Write the current config in memory to disk"""
# Write loaded configuration files
for config_key in [key for key in self.config if key != 'clusters']:
file_path = os.path.join(self.config_path,
'{}.json'.format(config_key))
file_path = os.path.join(self.config_path, '{}.json'.format(config_key))
if config_key == 'logs':
self._config_writer(self.config[config_key],
file_path,
sort_keys=False)
self._config_writer(self.config[config_key], file_path, sort_keys=False)
else:
self._config_writer(self.config[config_key], file_path)

# Write loaded cluster files
for cluster_key in self.config['clusters']:
file_path = os.path.join(self.config_path,
'clusters',
'{}.json'.format(cluster_key))
self._config_writer(self.config['clusters'][cluster_key],
file_path)
file_path = os.path.join(self.config_path, 'clusters', '{}.json'.format(cluster_key))
self._config_writer(self.config['clusters'][cluster_key], file_path)
Loading

0 comments on commit ab8e68a

Please sign in to comment.