Skip to content

Commit

Permalink
config consolidation/removal of tech debt (#707)
Browse files Browse the repository at this point in the history
* adding new shared config loading functionality

* updating alert processor to use new config loading func

* updates to some alert processor unit tests

* updates to threat intel downloader to use new config loading func

* updates to threat intel downloader unit tests

* adding tests for new config loading logic

* updates to athena partition refresh to use new config loading func

* updates to athena partition refresh unit tests

* updating unit test lambda.json config file

* updates to rule processor to use new config loading func

* updates to rule processor unit tests

* removing unnecessary logic related to config from CLI

* updating the CLIConfig class to use the shared config loading logic

* fixing test failures

* addressing pr feedback
  • Loading branch information
ryandeivert authored Apr 26, 2018
1 parent 0e43516 commit e59eb90
Show file tree
Hide file tree
Showing 40 changed files with 552 additions and 789 deletions.
8 changes: 4 additions & 4 deletions conf/global.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
"region": "us-east-1"
},
"general": {
"rule_locations": [
"rules"
],
"matcher_locations": [
"matchers"
],
"rule_locations": [
"rules"
]
},
"infrastructure": {
Expand Down Expand Up @@ -39,4 +39,4 @@
"tfstate_s3_key": "stream_alert_state/terraform.tfstate",
"tfvars": "terraform.tfvars"
}
}
}
2 changes: 1 addition & 1 deletion conf/lambda.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@
"source_current_hash": "<auto_generated>",
"source_object_key": "<auto_generated>"
}
}
}
6 changes: 2 additions & 4 deletions stream_alert/alert_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
limitations under the License.
"""
from __future__ import absolute_import # Suppresses RuntimeWarning import error in Lambda
import json
import os

from stream_alert.alert_processor import LOGGER
from stream_alert.alert_processor.outputs.output_base import StreamAlertOutput
from stream_alert.shared import backoff_handlers, NORMALIZATION_KEY, resources
from stream_alert.shared.alert import Alert, AlertCreationError
from stream_alert.shared.alert_table import AlertTable
from stream_alert.shared.config import load_config

import backoff
from botocore.exceptions import ClientError
Expand All @@ -31,7 +31,6 @@ class AlertProcessor(object):
"""Orchestrates delivery of alerts to the appropriate dispatchers."""
ALERT_PROCESSOR = None # AlertProcessor instance which can be re-used across Lambda invocations
BACKOFF_MAX_TRIES = 5
OUTPUT_CONFIG_PATH = 'conf/outputs.json'

@classmethod
def get_instance(cls, invoked_function_arn):
Expand All @@ -54,8 +53,7 @@ def __init__(self, invoked_function_arn):
self.prefix = split_arn[6].split('_')[0]

# Merge user-specified output configuration with the required output configuration
with open(self.OUTPUT_CONFIG_PATH) as f:
output_config = json.load(f)
output_config = load_config(include={'outputs.json'})['outputs']
self.config = resources.merge_required_outputs(output_config, self.prefix)

self.alerts_table = AlertTable(os.environ['ALERTS_TABLE'])
Expand Down
37 changes: 2 additions & 35 deletions stream_alert/athena_partition_refresh/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,7 @@
giveup_handler,
success_handler
)


def _load_config():
"""Load the StreamAlert Athena configuration files
Returns:
dict: Configuration settings by file, includes two keys:
lambda, All lambda function settings
global, StreamAlert global settings
Raises:
ConfigError: For invalid or missing configuration files.
"""
config_files = ('lambda', 'global')
config = {}
for config_file in config_files:
config_file_path = 'conf/{}.json'.format(config_file)

if not os.path.exists(config_file_path):
raise ConfigError('The \'{}\' config file was not found'.format(
config_file_path))

with open(config_file_path) as config_fh:
try:
config[config_file] = json.load(config_fh)
except ValueError:
raise ConfigError('The \'{}\' config file is not valid JSON'.format(
config_file))

return config


class ConfigError(Exception):
"""Custom StreamAlertAthena Config Exception Class"""
from stream_alert.shared.config import load_config


class AthenaPartitionRefreshError(Exception):
Expand Down Expand Up @@ -540,7 +507,7 @@ def unique_s3_buckets_and_keys(self):

def handler(*_):
"""Athena Partition Refresher Handler Function"""
config = _load_config()
config = load_config(include={'lambda.json', 'global.json'})

# Initialize the SQS client and recieve messages
stream_alert_sqs = StreamAlertSQSClient(config)
Expand Down
136 changes: 0 additions & 136 deletions stream_alert/rule_processor/config.py

This file was deleted.

13 changes: 6 additions & 7 deletions stream_alert/rule_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
from stream_alert.rule_processor import FUNCTION_NAME, LOGGER
from stream_alert.rule_processor.alert_forward import AlertForwarder
from stream_alert.rule_processor.classifier import StreamClassifier
from stream_alert.rule_processor.config import load_config, load_env
from stream_alert.rule_processor.firehose import StreamAlertFirehose
from stream_alert.rule_processor.payload import load_stream_payload
from stream_alert.rule_processor.rules_engine import RulesEngine
from stream_alert.shared import stats
from stream_alert.shared import config, stats
from stream_alert.shared.metrics import MetricLogger


Expand All @@ -39,11 +38,11 @@ def __init__(self, context):
executing lambda function.
"""
# Load the config. Validation occurs during load, which will
# raise exceptions on any ConfigErrors
StreamAlert.config = StreamAlert.config or load_config()
# raise exceptions on any ConfigError
StreamAlert.config = StreamAlert.config or config.load_config(validate=True)

# Load the environment from the context arn
self.env = load_env(context)
self.env = config.parse_lambda_arn(context.invoked_function_arn)

# Instantiate the send_alerts here to handle sending the triggered alerts to the
# alert processor
Expand Down Expand Up @@ -162,7 +161,7 @@ def run(self, event):
# Only log rule info here if this is not running tests
# During testing, this gets logged at the end and printing here could be confusing
# since stress testing calls this method multiple times
if self.env['lambda_alias'] != 'development':
if self.env['qualifier'] != 'development':
stats.print_rule_stats(True)

return self._failed_record_count == 0
Expand All @@ -184,7 +183,7 @@ def _process_alerts(self, payload):
self._processed_size += len(record.pre_parsed_record)
self.classifier.classify_record(record)
if not record.valid:
if self.env['lambda_alias'] != 'development':
if self.env['qualifier'] != 'development':
LOGGER.error('Record does not match any defined schemas: %s\n%s',
record, record.pre_parsed_record)

Expand Down
Loading

0 comments on commit e59eb90

Please sign in to comment.