From 75479ec6fa63cedcc5dcf5c167f212f0e14f0943 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:19:56 -0700 Subject: [PATCH 01/15] adding new shared config loading functionality --- stream_alert/shared/config.py | 156 ++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 stream_alert/shared/config.py diff --git a/stream_alert/shared/config.py b/stream_alert/shared/config.py new file mode 100644 index 000000000..24f9eedeb --- /dev/null +++ b/stream_alert/shared/config.py @@ -0,0 +1,156 @@ +""" +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from collections import defaultdict, OrderedDict +import json +import os + + +class ConfigError(Exception): + """Exception class for config file errors""" + + +def parse_lambda_arn(function_arn): + """Extract info on the current environment from the lambda function ARN + + Parses the invoked_function_arn from the given context object to get + the name of the currently running alias (either production or development) + and the name of the function. + + Example: + arn:aws:lambda:aws-region:acct-id:function:stream_alert:production + + Arguments: + function_arn (str): The AWS Lambda function ARN + + Returns: + dict: + { + 'region': 'region_name', + 'account_id': 'account_id', + 'function_name': 'function_name', + 'qualifier': 'qualifier' + } + """ + split_arn = function_arn.split(':') + return { + 'region': split_arn[3], + 'account_id': split_arn[4], + 'function_name': split_arn[6], + 'qualifier': split_arn[7] if len(split_arn) == 8 else None # optional qualifier + } + + +def load_config(conf_dir='conf/', **kwargs): + """Load the configuration for StreamAlert. + + All configuration files live in the `conf` directory in JSON format. + `sources` define a colleciton of AWS services (S3, Kinesis) supported as + inputs to StreamAlert, specific entities (S3 buckets, Kinesis streams), + and log types emitted from them. + + `logs` declare the schema for the listed log types in `sources`. Each + key denotes the name of the log type, and includes 'keys' used to match + rules to log fields. + + Arguments: + conf_dir (str): [optional] Path from which to load the config + + Keyword Arguemnts: + exclude (set): Names of config files or folders that should not be loaded + include (set): Names of specific config files to only load + validate (bool): Validate aspects of the config to check for user error + + """ + default_files = {file for file in os.listdir(conf_dir) if file.endswith('.json')} + conf_files = kwargs.get('include', default_files).copy() + include_clusters = 'clusters' in conf_files + + conf_files.intersection_update(default_files) + if not (conf_files or include_clusters): + raise ConfigError('No config files to load') + + exclusions = kwargs.get('exclude', set()) + conf_files = conf_files.difference(exclusions) + + config = defaultdict(dict) + for name in conf_files: + path = os.path.join(conf_dir, name) + # we use object_pairs_hook=OrderdDict to preserve schema order for CSV/KV log types + config[os.path.splitext(name)[0]] = _load_json_file(path, True) + + # Load the configs for clusters if it is not excluded + if 'clusters' not in exclusions and not kwargs.get('include') or include_clusters: + clusters = {file for file in os.listdir(os.path.join(conf_dir, 'clusters')) + if file.endswith('.json')} + for cluster in clusters: + cluster_path = os.path.join(conf_dir, 'clusters', cluster) + config['clusters'][os.path.splitext(cluster)[0]] = _load_json_file(cluster_path) + + if kwargs.get('validate'): + _validate_config(config) + + return config + +def _load_json_file(path, ordered=False): + """Helper to return the loaded json from a given path""" + kwargs = {'object_pairs_hook': OrderedDict if ordered else None} + with open(path) as data: + try: + return json.load(data, **kwargs) + except ValueError: + raise ConfigError('Invalid JSON format for {}'.format(path)) + + +def _validate_config(config): + """Validate the StreamAlert configuration contains a valid structure. + + Checks for `logs.json`: + - each log has a schema and parser declared + Checks for `sources.json` + - the sources contains either kinesis or s3 keys + - each sources has a list of logs declared + """ + # Check the log declarations + if 'logs' in config: + + for log, attrs in config['logs'].iteritems(): + if 'schema' not in attrs: + raise ConfigError('The \'schema\' is missing for {}'.format(log)) + + if 'parser' not in attrs: + raise ConfigError('The \'parser\' is missing for {}'.format(log)) + + # Check if the defined sources are supported and report any invalid entries + if 'sources' in config: + supported_sources = {'kinesis', 's3', 'sns', 'stream_alert_app'} + if not set(config['sources']).issubset(supported_sources): + missing_sources = supported_sources - set(config['sources']) + raise ConfigError( + 'The \'sources.json\' file contains invalid source entries: {}. ' + 'The following sources are supported: {}'.format( + ', '.join('\'{}\''.format(source) for source in missing_sources), + ', '.join('\'{}\''.format(source) for source in supported_sources) + ) + ) + + # Iterate over each defined source and make sure the required subkeys exist + for attrs in config['sources'].values(): + for entity, entity_attrs in attrs.iteritems(): + if 'logs' not in entity_attrs: + raise ConfigError('Missing \'logs\' key for entity: {}'.format(entity)) + + if not entity_attrs['logs']: + raise ConfigError('List of \'logs\' is empty for entity: {}'.format(entity)) From e60dc26cbef1dfcff6e1c1a1192316218b8a1b10 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:20:53 -0700 Subject: [PATCH 02/15] updating alert processor to use new config loading func --- stream_alert/alert_processor/main.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 2348ca015..1c0fb317d 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -22,6 +22,7 @@ from stream_alert.shared import backoff_handlers, NORMALIZATION_KEY, resources from stream_alert.shared.alert import Alert, AlertCreationError from stream_alert.shared.alert_table import AlertTable +from stream_alert.shared.config import load_config import backoff from botocore.exceptions import ClientError @@ -31,7 +32,6 @@ class AlertProcessor(object): """Orchestrates delivery of alerts to the appropriate dispatchers.""" ALERT_PROCESSOR = None # AlertProcessor instance which can be re-used across Lambda invocations BACKOFF_MAX_TRIES = 5 - OUTPUT_CONFIG_PATH = 'conf/outputs.json' @classmethod def get_instance(cls, invoked_function_arn): @@ -54,8 +54,7 @@ def __init__(self, invoked_function_arn): self.prefix = split_arn[6].split('_')[0] # Merge user-specified output configuration with the required output configuration - with open(self.OUTPUT_CONFIG_PATH) as f: - output_config = json.load(f) + output_config = load_config(include={'outputs.json'})['outputs'] self.config = resources.merge_required_outputs(output_config, self.prefix) self.alerts_table = AlertTable(os.environ['ALERTS_TABLE']) From 98c653efdd563b1f2765bfa1aac24dfd7c1af6a5 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:22:48 -0700 Subject: [PATCH 03/15] updates to some alert processor unit tests --- .../unit/stream_alert_alert_processor/__init__.py | 5 ++--- .../stream_alert_alert_processor/test_main.py | 8 +++++--- .../test_outputs/test_aws.py | 7 ++++++- .../test_outputs/test_github.py | 10 +++++++--- .../test_outputs/test_jira.py | 10 +++++++--- .../test_outputs/test_komand.py | 10 +++++++--- .../test_outputs/test_output_base.py | 9 +++++++-- .../test_outputs/test_pagerduty.py | 15 ++++++++++----- .../test_outputs/test_phantom.py | 11 ++++++++--- .../test_outputs/test_slack.py | 10 +++++++--- 10 files changed, 66 insertions(+), 29 deletions(-) diff --git a/tests/unit/stream_alert_alert_processor/__init__.py b/tests/unit/stream_alert_alert_processor/__init__.py index cb85de406..2aeaf58c4 100644 --- a/tests/unit/stream_alert_alert_processor/__init__.py +++ b/tests/unit/stream_alert_alert_processor/__init__.py @@ -13,15 +13,14 @@ import json from stream_alert.shared import resources +from stream_alert.shared.config import load_config REGION = 'us-east-1' ACCOUNT_ID = '123456789012' PREFIX = 'prefix' FUNCTION_NAME = '{}_streamalert_alert_processor'.format(PREFIX) -OUTPUT_CONFIG_PATH = 'tests/unit/conf/outputs.json' -with open(OUTPUT_CONFIG_PATH) as f: - base_config = json.load(f) +base_config = load_config('tests/unit/conf/', include={'outputs.json'})['outputs'] CONFIG = resources.merge_required_outputs(base_config, PREFIX) ALERTS_TABLE = '{}_streamalert_alerts'.format(PREFIX) diff --git a/tests/unit/stream_alert_alert_processor/test_main.py b/tests/unit/stream_alert_alert_processor/test_main.py index 2f652798f..a63683c14 100644 --- a/tests/unit/stream_alert_alert_processor/test_main.py +++ b/tests/unit/stream_alert_alert_processor/test_main.py @@ -15,7 +15,7 @@ """ import os -from mock import ANY, call, MagicMock, patch +from mock import ANY, call, MagicMock, Mock, patch from nose.tools import ( assert_equal, assert_false, @@ -28,8 +28,9 @@ from stream_alert.alert_processor.outputs.output_base import OutputDispatcher from stream_alert.shared import NORMALIZATION_KEY from stream_alert.shared.alert import Alert +from stream_alert.shared.config import load_config from tests.unit.stream_alert_alert_processor import ( - ACCOUNT_ID, ALERTS_TABLE, FUNCTION_NAME, OUTPUT_CONFIG_PATH, PREFIX, REGION) + ACCOUNT_ID, ALERTS_TABLE, FUNCTION_NAME, PREFIX, REGION) _ARN = 'arn:aws:lambda:{}:{}:function:{}:production'.format(REGION, ACCOUNT_ID, FUNCTION_NAME) @@ -39,9 +40,10 @@ class TestAlertProcessor(object): """Tests for alert_processor/main.py""" # pylint: disable=no-member,no-self-use,protected-access + @patch('stream_alert.alert_processor.main.load_config', + Mock(return_value=load_config('tests/unit/conf/', validate=True))) @patch.dict(os.environ, {'ALERTS_TABLE': ALERTS_TABLE}) @patch.object(AlertProcessor, 'BACKOFF_MAX_TRIES', 1) - @patch.object(AlertProcessor, 'OUTPUT_CONFIG_PATH', OUTPUT_CONFIG_PATH) @patch('stream_alert.alert_processor.main.AlertTable', MagicMock()) def setup(self): """Alert Processor - Test Setup""" diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_aws.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_aws.py index 1d2951fd0..508d9c8b4 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_aws.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_aws.py @@ -35,7 +35,12 @@ ) from stream_alert_cli.helpers import create_lambda_function from tests.unit.stream_alert_alert_processor import ( - ACCOUNT_ID, CONFIG, FUNCTION_NAME, PREFIX, REGION) + ACCOUNT_ID, + CONFIG, + FUNCTION_NAME, + PREFIX, + REGION +) from tests.unit.stream_alert_alert_processor.helpers import get_alert diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_github.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_github.py index 48e2ea361..011e8fd45 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_github.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_github.py @@ -22,8 +22,12 @@ from stream_alert.alert_processor.outputs.github import GithubOutput from stream_alert_cli.helpers import put_mock_creds -from tests.unit.stream_alert_alert_processor import \ - ACCOUNT_ID, CONFIG, FUNCTION_NAME, KMS_ALIAS, REGION +from tests.unit.stream_alert_alert_processor import ( + ACCOUNT_ID, + FUNCTION_NAME, + KMS_ALIAS, + REGION +) from tests.unit.stream_alert_alert_processor.helpers import ( get_alert, remove_temp_secrets @@ -43,7 +47,7 @@ class TestGithubOutput(object): def setup(self): """Setup before each method""" - self._dispatcher = GithubOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG) + self._dispatcher = GithubOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, None) remove_temp_secrets() output_name = self._dispatcher.output_cred_name(self.DESCRIPTOR) put_mock_creds(output_name, self.CREDS, self._dispatcher.secrets_bucket, REGION, KMS_ALIAS) diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_jira.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_jira.py index 854797108..c18c4237e 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_jira.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_jira.py @@ -20,8 +20,12 @@ from stream_alert.alert_processor.outputs.jira import JiraOutput from stream_alert_cli.helpers import put_mock_creds -from tests.unit.stream_alert_alert_processor import \ - ACCOUNT_ID, CONFIG, FUNCTION_NAME, KMS_ALIAS, REGION +from tests.unit.stream_alert_alert_processor import ( + ACCOUNT_ID, + FUNCTION_NAME, + KMS_ALIAS, + REGION +) from tests.unit.stream_alert_alert_processor.helpers import get_alert, remove_temp_secrets @@ -41,7 +45,7 @@ class TestJiraOutput(object): def setup(self): """Setup before each method""" - self._dispatcher = JiraOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG) + self._dispatcher = JiraOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, None) self._dispatcher._base_url = self.CREDS['url'] remove_temp_secrets() output_name = self._dispatcher.output_cred_name(self.DESCRIPTOR) diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_komand.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_komand.py index 7f24cb780..4e8d9fcc2 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_komand.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_komand.py @@ -20,8 +20,12 @@ from stream_alert.alert_processor.outputs.komand import KomandOutput from stream_alert_cli.helpers import put_mock_creds -from tests.unit.stream_alert_alert_processor import \ - ACCOUNT_ID, CONFIG, FUNCTION_NAME, KMS_ALIAS, REGION +from tests.unit.stream_alert_alert_processor import ( + ACCOUNT_ID, + FUNCTION_NAME, + KMS_ALIAS, + REGION +) from tests.unit.stream_alert_alert_processor.helpers import get_alert, remove_temp_secrets @@ -37,7 +41,7 @@ class TestKomandutput(object): def setup(self): """Setup before each method""" - self._dispatcher = KomandOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG) + self._dispatcher = KomandOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, None) remove_temp_secrets() output_name = self._dispatcher.output_cred_name(self.DESCRIPTOR) put_mock_creds(output_name, self.CREDS, self._dispatcher.secrets_bucket, REGION, KMS_ALIAS) diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_output_base.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_output_base.py index dd85e7cff..ab3c5a5e3 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_output_base.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_output_base.py @@ -35,8 +35,13 @@ ) from stream_alert.alert_processor.outputs.aws import S3Output from stream_alert_cli.helpers import encrypt_with_kms, put_mock_creds, put_mock_s3_object -from tests.unit.stream_alert_alert_processor import \ - ACCOUNT_ID, CONFIG, FUNCTION_NAME, KMS_ALIAS, REGION +from tests.unit.stream_alert_alert_processor import ( + ACCOUNT_ID, + CONFIG, + FUNCTION_NAME, + KMS_ALIAS, + REGION +) from tests.unit.stream_alert_alert_processor.helpers import remove_temp_secrets diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_pagerduty.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_pagerduty.py index 6c7e74f46..d45ae834e 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_pagerduty.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_pagerduty.py @@ -24,8 +24,13 @@ PagerDutyIncidentOutput ) from stream_alert_cli.helpers import put_mock_creds -from tests.unit.stream_alert_alert_processor import \ - ACCOUNT_ID, CONFIG, FUNCTION_NAME, KMS_ALIAS, REGION +from tests.unit.stream_alert_alert_processor import ( + ACCOUNT_ID, + FUNCTION_NAME, + KMS_ALIAS, + REGION +) + from tests.unit.stream_alert_alert_processor.helpers import get_alert, remove_temp_secrets @@ -41,7 +46,7 @@ class TestPagerDutyOutput(object): def setup(self): """Setup before each method""" - self._dispatcher = PagerDutyOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG) + self._dispatcher = PagerDutyOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, None) remove_temp_secrets() output_name = self._dispatcher.output_cred_name(self.DESCRIPTOR) put_mock_creds(output_name, self.CREDS, self._dispatcher.secrets_bucket, REGION, KMS_ALIAS) @@ -93,7 +98,7 @@ class TestPagerDutyOutputV2(object): def setup(self): """Setup before each method""" - self._dispatcher = PagerDutyOutputV2(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG) + self._dispatcher = PagerDutyOutputV2(REGION, ACCOUNT_ID, FUNCTION_NAME, None) remove_temp_secrets() output_name = self._dispatcher.output_cred_name(self.DESCRIPTOR) put_mock_creds(output_name, self.CREDS, self._dispatcher.secrets_bucket, REGION, KMS_ALIAS) @@ -156,7 +161,7 @@ class TestPagerDutyIncidentOutput(object): def setup(self): """Setup before each method""" - self._dispatcher = PagerDutyIncidentOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG) + self._dispatcher = PagerDutyIncidentOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, None) self._dispatcher._base_url = self.CREDS['api'] remove_temp_secrets() output_name = self._dispatcher.output_cred_name(self.DESCRIPTOR) diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_phantom.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_phantom.py index 8eef627a6..d038df779 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_phantom.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_phantom.py @@ -20,8 +20,13 @@ from stream_alert.alert_processor.outputs.phantom import PhantomOutput from stream_alert_cli.helpers import put_mock_creds -from tests.unit.stream_alert_alert_processor import \ - ACCOUNT_ID, CONFIG, FUNCTION_NAME, KMS_ALIAS, REGION +from tests.unit.stream_alert_alert_processor import ( + ACCOUNT_ID, + FUNCTION_NAME, + KMS_ALIAS, + REGION +) + from tests.unit.stream_alert_alert_processor.helpers import get_alert, remove_temp_secrets @@ -37,7 +42,7 @@ class TestPhantomOutput(object): def setup(self): """Setup before each method""" - self._dispatcher = PhantomOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG) + self._dispatcher = PhantomOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, None) remove_temp_secrets() output_name = self._dispatcher.output_cred_name(self.DESCRIPTOR) put_mock_creds(output_name, self.CREDS, self._dispatcher.secrets_bucket, REGION, KMS_ALIAS) diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_slack.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_slack.py index 66b6de133..3f76046cb 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_slack.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_slack.py @@ -22,8 +22,12 @@ from stream_alert.alert_processor.outputs.slack import SlackOutput from stream_alert_cli.helpers import put_mock_creds -from tests.unit.stream_alert_alert_processor import \ - ACCOUNT_ID, CONFIG, FUNCTION_NAME, KMS_ALIAS, REGION +from tests.unit.stream_alert_alert_processor import ( + ACCOUNT_ID, + FUNCTION_NAME, + KMS_ALIAS, + REGION +) from tests.unit.stream_alert_alert_processor.helpers import ( get_random_alert, get_alert, @@ -42,7 +46,7 @@ class TestSlackOutput(object): def setup(self): """Setup before each method""" - self._dispatcher = SlackOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, CONFIG) + self._dispatcher = SlackOutput(REGION, ACCOUNT_ID, FUNCTION_NAME, None) remove_temp_secrets() output_name = self._dispatcher.output_cred_name(self.DESCRIPTOR) put_mock_creds(output_name, self.CREDS, self._dispatcher.secrets_bucket, REGION, KMS_ALIAS) From cba902905d4c613230969b75787cdc5d0891d3be Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:24:04 -0700 Subject: [PATCH 04/15] updates to threat intel downloader to use new config loading func --- .../threat_intel_downloader/exceptions.py | 4 +- stream_alert/threat_intel_downloader/main.py | 48 ++----------------- 2 files changed, 7 insertions(+), 45 deletions(-) diff --git a/stream_alert/threat_intel_downloader/exceptions.py b/stream_alert/threat_intel_downloader/exceptions.py index 712f4ffab..49bb03ab2 100644 --- a/stream_alert/threat_intel_downloader/exceptions.py +++ b/stream_alert/threat_intel_downloader/exceptions.py @@ -18,14 +18,14 @@ class ThreatStreamException(Exception): """Base exception class ThreatStream Error""" + class ThreatStreamCredsError(ThreatStreamException): """Class for API Credential errors""" -class ThreatStreamConfigError(ThreatStreamException): - """Class for Configuration errors""" class ThreatStreamLambdaInvokeError(ThreatStreamException): """Class for Lambda Invoke Error""" + class ThreatStreamRequestsError(ThreatStreamException): """Classe for requests return code errors""" diff --git a/stream_alert/threat_intel_downloader/main.py b/stream_alert/threat_intel_downloader/main.py index e35ba47a0..089f2fa13 100644 --- a/stream_alert/threat_intel_downloader/main.py +++ b/stream_alert/threat_intel_downloader/main.py @@ -15,25 +15,21 @@ """ from __future__ import absolute_import # Suppresses RuntimeWarning import error in Lambda import json -import os import boto3 from botocore.exceptions import ClientError +from stream_alert.shared.config import load_config, parse_lambda_arn from stream_alert.threat_intel_downloader.threat_stream import ThreatStream from stream_alert.threat_intel_downloader import LOGGER, END_TIME_BUFFER -from stream_alert.threat_intel_downloader.exceptions import ( - ThreatStreamLambdaInvokeError, - ThreatStreamConfigError -) - -CONFIG_FILE_PATH = 'conf/lambda.json' +from stream_alert.threat_intel_downloader.exceptions import ThreatStreamLambdaInvokeError def handler(event, context): """Lambda handler""" - config = load_config() - config.update(parse_lambda_func_arn(context)) + lambda_config = load_config(include={'lambda.json'})['lambda'] + config = lambda_config.get('threat_intel_downloader_config') + config.update(parse_lambda_arn(context.invoked_function_arn)) threat_stream = ThreatStream(config) intelligence, next_url, continue_invoke = threat_stream.runner(event) @@ -60,37 +56,3 @@ def invoke_lambda_function(next_url, config): except ClientError as err: LOGGER.error('Lambda client error: %s when lambda function invoke self', err) raise ThreatStreamLambdaInvokeError - -def parse_lambda_func_arn(context): - """Parse Lambda function arn to get function name, account id, region""" - func_arn = context.invoked_function_arn.split(':') - - return { - 'region': func_arn[3], - 'account_id': func_arn[4], - 'function_name': func_arn[6], - 'qualifier': func_arn[7] - } - -def load_config(): - """Load the Threat Intel Downloader configuration from conf/lambda.json file - - Returns: - (dict): Configuration for Threat Intel Downloader - - Raises: - ThreatStreamConfigError: For invalid or missing configuration files. - """ - config = {} - if not os.path.exists(CONFIG_FILE_PATH): - raise ThreatStreamConfigError('The \'{}\' config file was not found'.format( - CONFIG_FILE_PATH)) - - with open(CONFIG_FILE_PATH) as config_fh: - try: - config = json.load(config_fh) - except ValueError: - raise ThreatStreamConfigError('The \'{}\' config file is not valid JSON'.format( - CONFIG_FILE_PATH)) - - return config.get('threat_intel_downloader_config', None) From a82ff2563692fa60f967628f5f624e47b7a630cc Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:25:57 -0700 Subject: [PATCH 05/15] updates to threat intel downloader unit tests --- .../unit/threat_intel_downloader/__init__.py | 6 +- .../threat_intel_downloader/test_helpers.py | 69 ++++--------------- .../unit/threat_intel_downloader/test_main.py | 69 +++---------------- .../test_threat_stream.py | 2 +- 4 files changed, 30 insertions(+), 116 deletions(-) diff --git a/tests/unit/threat_intel_downloader/__init__.py b/tests/unit/threat_intel_downloader/__init__.py index 9a74eb2a5..78414cb14 100644 --- a/tests/unit/threat_intel_downloader/__init__.py +++ b/tests/unit/threat_intel_downloader/__init__.py @@ -1,4 +1,4 @@ -''' +""" Copyright 2017-present, Airbnb Inc. Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,7 +12,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -''' +""" +from stream_alert.shared.config import load_config REGION = 'us-east-1' FUNCTION_NAME = 'prefix_threat_intel_downloader' +CONFIG = load_config('tests/unit/conf/', include={'lambda.json'}) diff --git a/tests/unit/threat_intel_downloader/test_helpers.py b/tests/unit/threat_intel_downloader/test_helpers.py index 97656ef7e..d58dec638 100644 --- a/tests/unit/threat_intel_downloader/test_helpers.py +++ b/tests/unit/threat_intel_downloader/test_helpers.py @@ -13,42 +13,22 @@ See the License for the specific language governing permissions and limitations under the License. """ +from copy import deepcopy + from mock import Mock -from tests.unit.threat_intel_downloader import FUNCTION_NAME, REGION - - -LAMBDA_FILE = 'conf/lambda.json' - -LAMBDA_SETTINGS = { - 'alert_processor_config': { - 'handler': 'stream_alert.alert_processor.main.handler', - 'source_bucket': 'unit-testing.streamalert.source', - 'source_current_hash': '', - 'source_object_key': '', - 'third_party_libraries': [] - }, - 'rule_processor_config': { - 'handler': 'stream_alert.rule_processor.main.handler', - 'source_bucket': 'unit-testing.streamalert.source', - 'source_current_hash': '', - 'source_object_key': '', - 'third_party_libraries': [ - 'jsonpath_rw', - 'netaddr' - ] - }, - 'threat_intel_downloader_config': { - 'enabled': True, - 'handler': 'main.handler', - 'timeout': '60', - 'memory': '128', - 'source_bucket': 'unit-testing.streamalert.source', - 'source_current_hash': '', - 'source_object_key': '', - 'third_party_libraries': [] - } -} +from tests.unit.threat_intel_downloader import CONFIG, FUNCTION_NAME, REGION + +def mock_config(): + """Return a copy of the config with the env set""" + ti_config = deepcopy(CONFIG['lambda']['threat_intel_downloader_config']) + ti_config.update({ + 'account_id': '123456789012', + 'function_name': 'prefix_threat_intel_downloader', + 'qualifier': 'development', + 'region': 'us-east-1', + }) + return ti_config def get_mock_context(): @@ -114,24 +94,3 @@ def mock_invalid_ssm_response(): return { 'threat_intel_downloader_api_creds': 'invalid_value' } - - -def mock_config(): - '''Helper function to create a fake config for Threat Intel Downloader''' - return { - 'account_id': '123456789012', - 'function_name': 'prefix_threat_intel_downloader', - 'handler': 'stream_alert.threat_intel_downloader.main.handler', - 'interval': 'rate(1 day)', - 'ioc_filters': ['crowdstrike', '@airbnb.com'], - 'ioc_keys': ['expiration_ts', 'itype', 'source', 'type', 'value'], - 'ioc_types': ['domain', 'ip', 'md5'], - 'excluded_sub_types': ['bot_ip', 'brute_ip', 'scan_ip', 'spam_ip', 'tor_ip'], - 'log_level': 'info', - 'memory': '128', - 'qualifier': 'development', - 'region': 'us-east-1', - 'table_rcu': 10, - 'table_wcu': 10, - 'timeout': '180' - } diff --git a/tests/unit/threat_intel_downloader/test_main.py b/tests/unit/threat_intel_downloader/test_main.py index 67727a2ca..8721c24a4 100644 --- a/tests/unit/threat_intel_downloader/test_main.py +++ b/tests/unit/threat_intel_downloader/test_main.py @@ -14,45 +14,34 @@ limitations under the License. """ # pylint: disable=abstract-class-instantiated,protected-access,no-self-use -import json - from mock import patch, Mock -from nose.tools import ( - assert_equal, - raises -) +from nose.tools import raises from stream_alert.threat_intel_downloader.threat_stream import ThreatStream from stream_alert.threat_intel_downloader.main import ( handler, - invoke_lambda_function, - load_config, - parse_lambda_func_arn -) -from stream_alert.threat_intel_downloader.exceptions import ( - ThreatStreamLambdaInvokeError, - ThreatStreamConfigError + invoke_lambda_function ) +from stream_alert.threat_intel_downloader.exceptions import ThreatStreamLambdaInvokeError + from tests.unit.app_integrations.test_helpers import ( MockLambdaClient, MockSSMClient ) -from tests.unit.helpers.base import mock_open + +from tests.unit.threat_intel_downloader import CONFIG from tests.unit.threat_intel_downloader.test_helpers import ( get_mock_context, - mock_config, mock_requests_get, - mock_ssm_response, - LAMBDA_FILE, - LAMBDA_SETTINGS + mock_ssm_response ) @patch('stream_alert.threat_intel_downloader.main.load_config', - side_effect=mock_config) + Mock(return_value=CONFIG)) @patch('boto3.client') @patch.object(ThreatStream, '_connect') -def test_handler_without_next_token(mock_threatstream_connect, mock_ssm, mock_ti_config): # pylint: disable=unused-argument +def test_handler_without_next_token(mock_threatstream_connect, mock_ssm): """Threat Intel Downloader - Test handler""" mock_ssm.return_value = MockSSMClient(suppress_params=True, parameters=mock_ssm_response()) @@ -60,11 +49,11 @@ def test_handler_without_next_token(mock_threatstream_connect, mock_ssm, mock_ti mock_threatstream_connect.assert_not_called() @patch('stream_alert.threat_intel_downloader.main.load_config', - side_effect=mock_config) + Mock(return_value=CONFIG)) @patch('boto3.client') @patch('stream_alert.threat_intel_downloader.threat_stream.requests.get', side_effect=mock_requests_get) -def test_handler_next_token(mock_get, mock_ssm, mock_ti_config): # pylint: disable=unused-argument +def test_handler_next_token(mock_get, mock_ssm): """Threat Intel Downloader - Test handler with next token passed in""" mock_ssm.return_value = MockSSMClient(suppress_params=True, parameters=mock_ssm_response()) @@ -94,39 +83,3 @@ def test_invoke_lambda_function_error(): 'qualifier': 'development' } invoke_lambda_function('next_token', config) - -def test_parse_config(): - """Threat Intel Downloader - Test parse_lambda_func_arn""" - expect_config = { - 'region': 'us-east-1', - 'account_id': '123456789012', - 'function_name': 'prefix_threat_intel_downloader', - 'qualifier': 'development' - } - assert_equal(parse_lambda_func_arn(get_mock_context()), expect_config) - -@patch('os.path.exists', Mock(return_value=False)) -@raises(ThreatStreamConfigError) -def test_load_config_error(): - """Threat Intel Downloader - Test load_config and config file doesnot exist""" - load_config() - -@patch('os.path.exists', Mock(return_value=True)) -def test_load_valid_config(): - """Threat Intel Downloader - Test load valid config""" - lambda_settings = json.dumps(LAMBDA_SETTINGS) - expected_settings = { - 'enabled': True, - 'handler': 'main.handler', - 'timeout': '60', - 'memory': '128', - 'source_bucket': 'unit-testing.streamalert.source', - 'source_current_hash': '', - 'source_object_key': '', - 'third_party_libraries': [] - } - with mock_open(LAMBDA_FILE, lambda_settings): - assert_equal(load_config(), expected_settings) - - with mock_open(LAMBDA_FILE, json.dumps({'foo': 'bar'})): - assert_equal(load_config(), None) diff --git a/tests/unit/threat_intel_downloader/test_threat_stream.py b/tests/unit/threat_intel_downloader/test_threat_stream.py index 5a71d5ccd..f43c1bc9d 100644 --- a/tests/unit/threat_intel_downloader/test_threat_stream.py +++ b/tests/unit/threat_intel_downloader/test_threat_stream.py @@ -29,9 +29,9 @@ from stream_alert.threat_intel_downloader.threat_stream import ThreatStream from tests.unit.app_integrations.test_helpers import MockSSMClient from tests.unit.threat_intel_downloader.test_helpers import ( - mock_requests_get, mock_config, mock_invalid_ssm_response, + mock_requests_get, mock_ssm_response ) From 03c6fc63c69165ada3b3a9119af088678ea7cf98 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:26:40 -0700 Subject: [PATCH 06/15] adding tests for new config loading logic --- tests/unit/stream_alert_shared/test_config.py | 161 ++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 tests/unit/stream_alert_shared/test_config.py diff --git a/tests/unit/stream_alert_shared/test_config.py b/tests/unit/stream_alert_shared/test_config.py new file mode 100644 index 000000000..a2d8db031 --- /dev/null +++ b/tests/unit/stream_alert_shared/test_config.py @@ -0,0 +1,161 @@ +""" +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from nose.tools import assert_equal, assert_items_equal, raises +from pyfakefs import fake_filesystem_unittest + +from stream_alert.shared.config import ( + _validate_config, + load_config, + parse_lambda_arn, + ConfigError, +) + +from tests.unit.stream_alert_rule_processor.test_helpers import get_mock_context, get_valid_config + +class TestConfigLoading(fake_filesystem_unittest.TestCase): + """Test config loading logic with a mocked filesystem.""" + # pylint: disable=protected-access + + def setUp(self): + self.setUpPyfakefs() + + # Add config files which should be loaded + self.fs.CreateFile('conf/clusters/prod.json', contents='{}') + self.fs.CreateFile('conf/clusters/dev.json', contents='{}') + self.fs.CreateFile('conf/global.json', contents='{}') + self.fs.CreateFile('conf/lambda.json', contents='{}') + self.fs.CreateFile('conf/logs.json', contents='{}') + self.fs.CreateFile('conf/outputs.json', contents='{}') + self.fs.CreateFile('conf/sources.json', contents='{}') + self.fs.CreateFile('conf/types.json', contents='{}') + + def tearDown(self): + self.tearDownPyfakefs() + + @raises(ConfigError) + def test_load_invalid_file(self): + """Shared - Config Loading - Bad JSON""" + self.fs.CreateFile('conf/clusters/bad.json', contents='test string') + load_config() + + @staticmethod + @raises(ConfigError) + def test_load_invalid_path(): + """Shared - Config Loading - Bad JSON""" + load_config(include={'foobar.json'}) + + @staticmethod + def test_load_all(): + """Shared - Config Loading - All""" + config = load_config() + expected_keys = ['clusters', 'global', 'lambda', 'logs', 'outputs', 'sources', 'types'] + assert_items_equal(config.keys(), expected_keys) + + @staticmethod + def test_load_exclude(): + """Shared - Config Loading - Exclude""" + config = load_config(exclude={'global.json', 'logs.json'}) + expected_keys = ['clusters', 'lambda', 'outputs', 'sources', 'types'] + assert_items_equal(config.keys(), expected_keys) + + @staticmethod + def test_load_exclude_clusters(): + """Shared - Config Loading - Exclude Clusters""" + config = load_config(exclude={'clusters'}) + expected_keys = ['global', 'lambda', 'logs', 'outputs', 'sources', 'types'] + assert_items_equal(config.keys(), expected_keys) + + @staticmethod + def test_load_include(): + """Shared - Config Loading - Include""" + config = load_config(include={'clusters', 'logs.json'}) + expected_keys = ['clusters', 'logs'] + expected_clusters_keys = ['prod', 'dev'] + assert_items_equal(config.keys(), expected_keys) + assert_items_equal(config['clusters'].keys(), expected_clusters_keys) + + +@raises(ConfigError) +def test_config_no_schema(): + """Shared - Config Validator - No Schema in Log""" + # Load a valid config + config = get_valid_config() + + # Remove the 'schema' keys from the config + config['logs']['json_log'].pop('schema') + config['logs']['csv_log'].pop('schema') + + _validate_config(config) + + +@raises(ConfigError) +def test_config_no_parsers(): + """Shared - Config Validator - No Parser in Log""" + # Load a valid config + config = get_valid_config() + + # Remove the 'parser' keys from the config + config['logs']['json_log'].pop('parser') + config['logs']['csv_log'].pop('parser') + + _validate_config(config) + + +@raises(ConfigError) +def test_config_no_logs_key(): + """Shared - Config Validator - No Logs Key in Source""" + # Load a valid config + config = get_valid_config() + + # Remove everything from the sources entry + config['sources']['kinesis']['stream_1'] = {} + + _validate_config(config) + + +@raises(ConfigError) +def test_config_empty_logs_list(): + """Shared - Config Validator - Empty Logs List in Source""" + # Load a valid config + config = get_valid_config() + + # Set the logs key to an empty list + config['sources']['kinesis']['stream_1']['logs'] = [] + + _validate_config(config) + + +@raises(ConfigError) +def test_config_invalid_datasources(): + """Shared - Config Validator - Invalid Datasources""" + # Load a valid config + config = get_valid_config() + + # Set the sources value to contain an invalid data source ('sqs') + config['sources'] = {'sqs': {'queue_1': {}}} + + _validate_config(config) + + +def test_parse_lambda_arn(): + """Shared - Config - Parse Lambda ARN""" + context = get_mock_context() + + env = parse_lambda_arn(context.invoked_function_arn) + assert_equal(env['region'], 'us-east-1') + assert_equal(env['account_id'], '123456789012') + assert_equal(env['function_name'], 'corp-prefix_prod_streamalert_rule_processor') + assert_equal(env['qualifier'], 'development') From 2771ad2330f7c2fbba943a4d6a8c3c9d0fb9c077 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:27:36 -0700 Subject: [PATCH 07/15] updates to athena partition refresh to use new config loading func --- stream_alert/athena_partition_refresh/main.py | 37 +------------------ 1 file changed, 2 insertions(+), 35 deletions(-) diff --git a/stream_alert/athena_partition_refresh/main.py b/stream_alert/athena_partition_refresh/main.py index 03830a7c2..65e04ff56 100644 --- a/stream_alert/athena_partition_refresh/main.py +++ b/stream_alert/athena_partition_refresh/main.py @@ -30,40 +30,7 @@ giveup_handler, success_handler ) - - -def _load_config(): - """Load the StreamAlert Athena configuration files - - Returns: - dict: Configuration settings by file, includes two keys: - lambda, All lambda function settings - global, StreamAlert global settings - - Raises: - ConfigError: For invalid or missing configuration files. - """ - config_files = ('lambda', 'global') - config = {} - for config_file in config_files: - config_file_path = 'conf/{}.json'.format(config_file) - - if not os.path.exists(config_file_path): - raise ConfigError('The \'{}\' config file was not found'.format( - config_file_path)) - - with open(config_file_path) as config_fh: - try: - config[config_file] = json.load(config_fh) - except ValueError: - raise ConfigError('The \'{}\' config file is not valid JSON'.format( - config_file)) - - return config - - -class ConfigError(Exception): - """Custom StreamAlertAthena Config Exception Class""" +from stream_alert.shared.config import load_config class AthenaPartitionRefreshError(Exception): @@ -540,7 +507,7 @@ def unique_s3_buckets_and_keys(self): def handler(*_): """Athena Partition Refresher Handler Function""" - config = _load_config() + config = load_config(include={'lambda.json', 'global.json'}) # Initialize the SQS client and recieve messages stream_alert_sqs = StreamAlertSQSClient(config) From 5a0293193e84ac3de0f6911ee68434cfce39dba5 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:27:56 -0700 Subject: [PATCH 08/15] updates to athena partition refresh unit tests --- .../test_main.py | 40 +------------------ 1 file changed, 2 insertions(+), 38 deletions(-) diff --git a/tests/unit/stream_alert_athena_partition_refresh/test_main.py b/tests/unit/stream_alert_athena_partition_refresh/test_main.py index 5208d28bb..364313e7c 100644 --- a/tests/unit/stream_alert_athena_partition_refresh/test_main.py +++ b/tests/unit/stream_alert_athena_partition_refresh/test_main.py @@ -29,24 +29,18 @@ assert_is_instance, assert_is_none, assert_true, - raises, nottest, with_setup ) import stream_alert.athena_partition_refresh as apr from stream_alert.athena_partition_refresh.main import ( - _load_config, handler, - ConfigError, StreamAlertAthenaClient, StreamAlertSQSClient, ) from tests.unit.helpers.aws_mocks import MockAthenaClient, MockSqsClient -from tests.unit.helpers.base import mock_open -GLOBAL_FILE = 'conf/global.json' -LAMBDA_FILE = 'conf/lambda.json' TEST_REGION = 'us-east-2' CONFIG_DATA = { @@ -106,38 +100,8 @@ class TestStreamAlertAthenaGlobals(object): """Test class for global functions in Athena Partition Refresh""" # pylint: disable=no-self-use - @raises(ConfigError) - def test_invalid_json_config(self): - """Athena - Load Invalid Config""" - invalid_config_data = 'This is not JSON!!!' - with mock_open(LAMBDA_FILE, invalid_config_data): - with mock_open(GLOBAL_FILE, invalid_config_data): - _load_config() - - @raises(ConfigError) - def test_invalid_missing_config(self): - """Athena - Load Missing Config File""" - invalid_config_data = 'test' - with mock_open(LAMBDA_FILE, invalid_config_data): - with mock_open(GLOBAL_FILE, invalid_config_data): - with patch('os.path.exists') as mock_exists: - mock_exists.return_value = False - _load_config() - - def test_load_valid_config(self): - """Athena - Load Config""" - global_contents = json.dumps(CONFIG_DATA['global'], indent=4) - lambda_contents = json.dumps(CONFIG_DATA['lambda'], indent=4) - - with mock_open(GLOBAL_FILE, global_contents): - with mock_open(LAMBDA_FILE, lambda_contents): - config = _load_config() - - assert_equal(type(config), dict) - assert_equal(set(config), {'global', 'lambda'}) - @patch('stream_alert.athena_partition_refresh.main.LOGGER') - @patch('stream_alert.athena_partition_refresh.main._load_config', + @patch('stream_alert.athena_partition_refresh.main.load_config', return_value=CONFIG_DATA) @patch('stream_alert.athena_partition_refresh.main.StreamAlertSQSClient') @mock_sqs @@ -154,7 +118,7 @@ def test_handler_no_received_messages( assert_true(mock_logging.info.called) @patch('stream_alert.athena_partition_refresh.main.LOGGER') - @patch('stream_alert.athena_partition_refresh.main._load_config', + @patch('stream_alert.athena_partition_refresh.main.load_config', return_value=CONFIG_DATA) @patch('stream_alert.athena_partition_refresh.main.StreamAlertSQSClient') @mock_sqs From dff31a937b8e0b08f84061fd4601795928d71733 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:28:28 -0700 Subject: [PATCH 09/15] updating unit test lambda.json config file --- tests/unit/conf/lambda.json | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/tests/unit/conf/lambda.json b/tests/unit/conf/lambda.json index edf962a82..758db148d 100644 --- a/tests/unit/conf/lambda.json +++ b/tests/unit/conf/lambda.json @@ -44,12 +44,41 @@ "source_bucket": "unit-testing.streamalert.source", "source_current_hash": "12345", "source_object_key": "lambda/athena/source.zip", - "third_party_libraries": [ - "backoff" - ], "timeout": "60" }, "rule_processor_config": { "source_bucket": "unit.testing.source.bucket" + }, + "threat_intel_downloader_config": { + "enabled": true, + "excluded_sub_types": [ + "bot_ip", + "brute_ip", + "scan_ip", + "spam_ip", + "tor_ip" + ], + "handler": "main.handler", + "ioc_filters": [ + "crowdstrike", + "@airbnb.com" + ], + "ioc_keys": [ + "expiration_ts", + "itype", + "source", + "type", + "value" + ], + "ioc_types": [ + "domain", + "ip", + "md5" + ], + "memory": "128", + "source_bucket": "unit-testing.streamalert.source", + "source_current_hash": "", + "source_object_key": "", + "timeout": "60" } } \ No newline at end of file From bac158aa0532c6b8835fcf2ea3fb3246e489fa43 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:29:35 -0700 Subject: [PATCH 10/15] updates to rule processor to use new config loading func --- stream_alert/rule_processor/config.py | 136 ------------------------- stream_alert/rule_processor/handler.py | 13 ++- 2 files changed, 6 insertions(+), 143 deletions(-) delete mode 100644 stream_alert/rule_processor/config.py diff --git a/stream_alert/rule_processor/config.py b/stream_alert/rule_processor/config.py deleted file mode 100644 index f88d599cf..000000000 --- a/stream_alert/rule_processor/config.py +++ /dev/null @@ -1,136 +0,0 @@ -""" -Copyright 2017-present, Airbnb Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from collections import OrderedDict -import json -import os - - -class ConfigError(Exception): - """Exception class for config file errors""" - - -def load_config(conf_dir='conf/'): - """Load the configuration for StreamAlert. - - All configuration files live in the `conf` directory in JSON format. - `sources` define a colleciton of AWS services (S3, Kinesis) supported as - inputs to StreamAlert, specific entities (S3 buckets, Kinesis streams), - and log types emitted from them. - - `logs` declare the schema for the listed log types in `sources`. Each - key denotes the name of the log type, and includes 'keys' used to match - rules to log fields. - """ - conf_files = ('sources', 'logs', 'types', 'global', 'clusters') - config = dict() - for base_name in conf_files: - if base_name == 'clusters': - # Load current cluster config into memory for threat intel - path = os.path.join(conf_dir, base_name) - config['clusters'] = {} - cluster = os.environ.get('CLUSTER', '') - if cluster: - cluster_conf_path = '{}.json'.format(os.path.join(path, cluster)) - with open(cluster_conf_path) as data: - try: - config['clusters'][cluster] = json.load(data) - except ValueError: - raise ConfigError('Invalid JSON format for {}'.format(cluster_conf_path)) - else: - path = '{}.json'.format(os.path.join(conf_dir, base_name)) - with open(path) as data: - try: - # we use object_pairs_hook=OrderdDict to preserve schema order - # for CSV/KV log types - config[base_name] = json.load(data, object_pairs_hook=OrderedDict) - except ValueError: - raise ConfigError('Invalid JSON format for {}.json'.format(base_name)) - - # Validate the config. This will raise an exception on any errors, - # which bubbles up and will immediately break execution of the function - _validate_config(config) - - return config - - -def _validate_config(config): - """Validate the StreamAlert configuration contains a valid structure. - - Checks for `logs.json`: - - each log has a schema and parser declared - Checks for `sources.json` - - the sources contains either kinesis or s3 keys - - each sources has a list of logs declared - """ - # Check the log declarations - for log, attrs in config['logs'].iteritems(): - if 'schema' not in attrs: - raise ConfigError('The \'schema\' is missing for {}'.format(log)) - - if 'parser' not in attrs: - raise ConfigError('The \'parser\' is missing for {}'.format(log)) - - # Check if the defined sources are supported and report any invalid entries - supported_sources = {'kinesis', 's3', 'sns', 'stream_alert_app'} - if not set(config['sources']).issubset(supported_sources): - missing_sources = supported_sources - set(config['sources']) - raise ConfigError( - 'The \'sources.json\' file contains invalid source entries: {}. ' - 'The following sources are supported: {}'.format( - ', '.join('\'{}\''.format(source) for source in missing_sources), - ', '.join('\'{}\''.format(source) for source in supported_sources) - ) - ) - - # Iterate over each defined source and make sure the required subkeys exist - for attrs in config['sources'].values(): - for entity, entity_attrs in attrs.iteritems(): - if 'logs' not in entity_attrs: - raise ConfigError('Missing \'logs\' key for entity: {}'.format(entity)) - - if not entity_attrs['logs']: - raise ConfigError( - 'List of \'logs\' is empty for entity: {}'.format(entity)) - -def load_env(context): - """Get the current environment for the running Lambda function. - - Parses the invoked_function_arn from the given context object to get - the name of the currently running alias (either production or staging) - and the name of the function. - - Example: - arn:aws:lambda:aws-region:acct-id:function:stream_alert:production - - Args: - context: The AWS Lambda context object. - - Returns: - dict: - { - 'lambda_region': 'region_name', - 'account_id': 'account_id', - 'lambda_function_name': 'function_name', - 'lambda_alias': 'qualifier' - } - """ - arn = context.invoked_function_arn.split(':') - return { - 'lambda_region': arn[3], - 'account_id': arn[4], - 'lambda_function_name': arn[6], - 'lambda_alias': arn[7] - } diff --git a/stream_alert/rule_processor/handler.py b/stream_alert/rule_processor/handler.py index 8ec3061bb..c1c22bf44 100644 --- a/stream_alert/rule_processor/handler.py +++ b/stream_alert/rule_processor/handler.py @@ -19,11 +19,10 @@ from stream_alert.rule_processor import FUNCTION_NAME, LOGGER from stream_alert.rule_processor.alert_forward import AlertForwarder from stream_alert.rule_processor.classifier import StreamClassifier -from stream_alert.rule_processor.config import load_config, load_env from stream_alert.rule_processor.firehose import StreamAlertFirehose from stream_alert.rule_processor.payload import load_stream_payload from stream_alert.rule_processor.rules_engine import RulesEngine -from stream_alert.shared import stats +from stream_alert.shared import config, stats from stream_alert.shared.metrics import MetricLogger @@ -39,11 +38,11 @@ def __init__(self, context): executing lambda function. """ # Load the config. Validation occurs during load, which will - # raise exceptions on any ConfigErrors - StreamAlert.config = StreamAlert.config or load_config() + # raise exceptions on any ConfigError + StreamAlert.config = StreamAlert.config or config.load_config(validate=True) # Load the environment from the context arn - self.env = load_env(context) + self.env = config.parse_lambda_arn(context.invoked_function_arn) # Instantiate the send_alerts here to handle sending the triggered alerts to the # alert processor @@ -162,7 +161,7 @@ def run(self, event): # Only log rule info here if this is not running tests # During testing, this gets logged at the end and printing here could be confusing # since stress testing calls this method multiple times - if self.env['lambda_alias'] != 'development': + if self.env['qualifier'] != 'development': stats.print_rule_stats(True) return self._failed_record_count == 0 @@ -184,7 +183,7 @@ def _process_alerts(self, payload): self._processed_size += len(record.pre_parsed_record) self.classifier.classify_record(record) if not record.valid: - if self.env['lambda_alias'] != 'development': + if self.env['qualifier'] != 'development': LOGGER.error('Record does not match any defined schemas: %s\n%s', record, record.pre_parsed_record) From 14a2defa83efeaef5f1defcf874c0237b94f7f09 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:30:24 -0700 Subject: [PATCH 11/15] updates to rule processor unit tests --- .../test_classifier.py | 4 +- .../test_config.py | 119 ------------------ .../test_firehose.py | 2 +- .../test_handler.py | 13 +- .../test_parsers.py | 2 +- .../test_rules_engine.py | 2 +- .../test_threat_intel.py | 2 +- 7 files changed, 13 insertions(+), 131 deletions(-) delete mode 100644 tests/unit/stream_alert_rule_processor/test_config.py diff --git a/tests/unit/stream_alert_rule_processor/test_classifier.py b/tests/unit/stream_alert_rule_processor/test_classifier.py index 43502f4d0..478ea8a7b 100644 --- a/tests/unit/stream_alert_rule_processor/test_classifier.py +++ b/tests/unit/stream_alert_rule_processor/test_classifier.py @@ -26,8 +26,8 @@ ) import stream_alert.rule_processor.classifier as sa_classifier -from stream_alert.rule_processor.config import load_config from stream_alert.rule_processor.payload import load_stream_payload +from stream_alert.shared.config import load_config from tests.unit.stream_alert_rule_processor.test_helpers import make_kinesis_raw_record @@ -39,7 +39,7 @@ def __init__(self): def setup(self): """Setup before each method""" - config = load_config('tests/unit/conf') + config = load_config('tests/unit/conf', validate=True) self.classifier = sa_classifier.StreamClassifier(config) def _prepare_and_classify_payload(self, service, entity, raw_record): diff --git a/tests/unit/stream_alert_rule_processor/test_config.py b/tests/unit/stream_alert_rule_processor/test_config.py deleted file mode 100644 index 98bc89c3e..000000000 --- a/tests/unit/stream_alert_rule_processor/test_config.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -Copyright 2017-present, Airbnb Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -# pylint: disable=protected-access -from mock import mock_open, patch -from nose.tools import assert_equal, raises, nottest - -from stream_alert.rule_processor.config import ( - _validate_config, - ConfigError, - load_config, - load_env -) - -from tests.unit.stream_alert_rule_processor.test_helpers import get_mock_context, get_valid_config - - -@raises(ConfigError) -def test_load_config_invalid(): - """Config Validator - Load Config - Invalid""" - mocker = mock_open(read_data='test string that will throw an error') - with patch('__builtin__.open', mocker): - load_config() - - -@raises(ConfigError) -def test_config_no_schema(): - """Config Validator - No Schema in Log""" - # Load a valid config - config = get_valid_config() - - # Remove the 'schema' keys from the config - config['logs']['json_log'].pop('schema') - config['logs']['csv_log'].pop('schema') - - _validate_config(config) - - -@raises(ConfigError) -def test_config_no_parsers(): - """Config Validator - No Parser in Log""" - # Load a valid config - config = get_valid_config() - - # Remove the 'parser' keys from the config - config['logs']['json_log'].pop('parser') - config['logs']['csv_log'].pop('parser') - - _validate_config(config) - - -@raises(ConfigError) -def test_config_no_logs_key(): - """Config Validator - No Logs Key in Source""" - # Load a valid config - config = get_valid_config() - - # Remove everything from the sources entry - config['sources']['kinesis']['stream_1'] = {} - - _validate_config(config) - - -@raises(ConfigError) -def test_config_empty_logs_list(): - """Config Validator - Empty Logs List in Source""" - # Load a valid config - config = get_valid_config() - - # Set the logs key to an empty list - config['sources']['kinesis']['stream_1']['logs'] = [] - - _validate_config(config) - - -@raises(ConfigError) -def test_config_invalid_datasources(): - """Config Validator - Invalid Datasources""" - # Load a valid config - config = get_valid_config() - - # Set the sources value to contain an invalid data source ('sqs') - config['sources'] = {'sqs': {'queue_1': {}}} - - _validate_config(config) - - -def test_load_env(): - """Config - Environment Loader""" - context = get_mock_context() - - env = load_env(context) - assert_equal(env['lambda_region'], 'us-east-1') - assert_equal(env['account_id'], '123456789012') - assert_equal(env['lambda_function_name'], - 'corp-prefix_prod_streamalert_rule_processor') - assert_equal(env['lambda_alias'], 'development') - - -@nottest -#TODO(chunyong) add assertions to this test -def test_config_valid_types(): - """Config Validator - valid normalized types""" - # Load a valid config - config = load_config() - - _validate_config(config) diff --git a/tests/unit/stream_alert_rule_processor/test_firehose.py b/tests/unit/stream_alert_rule_processor/test_firehose.py index a8c08c6f0..8bdc5ccb8 100644 --- a/tests/unit/stream_alert_rule_processor/test_firehose.py +++ b/tests/unit/stream_alert_rule_processor/test_firehose.py @@ -18,8 +18,8 @@ from moto import mock_kinesis from nose.tools import (assert_equal, assert_false, assert_true) -from stream_alert.rule_processor.config import load_config from stream_alert.rule_processor.firehose import StreamAlertFirehose +from stream_alert.shared.config import load_config @patch('stream_alert.rule_processor.firehose.StreamAlertFirehose.MAX_BACKOFF_ATTEMPTS', 1) diff --git a/tests/unit/stream_alert_rule_processor/test_handler.py b/tests/unit/stream_alert_rule_processor/test_handler.py index bb1a4f206..429103003 100644 --- a/tests/unit/stream_alert_rule_processor/test_handler.py +++ b/tests/unit/stream_alert_rule_processor/test_handler.py @@ -19,7 +19,7 @@ import logging import os -from mock import ANY, call, patch +from mock import ANY, call, Mock, patch from moto import mock_dynamodb2, mock_kinesis from nose.tools import ( assert_equal, @@ -30,9 +30,10 @@ import boto3 from stream_alert.rule_processor import LOGGER -from stream_alert.rule_processor.handler import load_config, StreamAlert +from stream_alert.rule_processor.handler import StreamAlert from stream_alert.rule_processor.threat_intel import StreamThreatIntel from stream_alert.shared.alert import Alert +from stream_alert.shared.config import load_config from stream_alert.shared.rule import rule from tests.unit.stream_alert_rule_processor.test_helpers import ( convert_events_to_kinesis, @@ -47,8 +48,8 @@ class TestStreamAlert(object): """Test class for StreamAlert class""" - @patch('stream_alert.rule_processor.handler.load_config', - lambda: load_config('tests/unit/conf/')) + @patch('stream_alert.rule_processor.handler.config.load_config', + Mock(return_value=load_config('tests/unit/conf/', validate=True))) @patch.dict(os.environ, {'ALERT_PROCESSOR': 'unit-testing_streamalert_alert_processor', 'ALERTS_TABLE': 'unit-testing_streamalert_alerts', 'AWS_DEFAULT_REGION': 'us-east-1'}) @@ -166,7 +167,7 @@ def test_run_invalid_data(self, extract_mock, log_mock): event['Records'][0]['kinesis']['data'] = base64.b64encode('{"bad": "data"}') # Swap out the alias so the logging occurs - self.__sa_handler.env['lambda_alias'] = 'production' + self.__sa_handler.env['qualifier'] = 'production' self.__sa_handler.run(event) assert_equal( @@ -183,7 +184,7 @@ def test_run_send_alerts(self, extract_mock, rules_mock, forwarder_mock): rules_mock.return_value = (['success!!'], ['normalized_records']) # Swap out the alias so the logging occurs - self.__sa_handler.env['lambda_alias'] = 'production' + self.__sa_handler.env['qualifier'] = 'production' self.__sa_handler.run(get_valid_event()) diff --git a/tests/unit/stream_alert_rule_processor/test_parsers.py b/tests/unit/stream_alert_rule_processor/test_parsers.py index 36bf03645..1eeb269ba 100644 --- a/tests/unit/stream_alert_rule_processor/test_parsers.py +++ b/tests/unit/stream_alert_rule_processor/test_parsers.py @@ -25,8 +25,8 @@ assert_true ) -from stream_alert.rule_processor.config import load_config from stream_alert.rule_processor.parsers import get_parser +from stream_alert.shared.config import load_config class TestParser(object): diff --git a/tests/unit/stream_alert_rule_processor/test_rules_engine.py b/tests/unit/stream_alert_rule_processor/test_rules_engine.py index 71b371d8b..87ff3d702 100644 --- a/tests/unit/stream_alert_rule_processor/test_rules_engine.py +++ b/tests/unit/stream_alert_rule_processor/test_rules_engine.py @@ -26,10 +26,10 @@ assert_true, ) -from stream_alert.rule_processor.config import load_config from stream_alert.rule_processor.parsers import get_parser from stream_alert.rule_processor.rules_engine import RulesEngine from stream_alert.shared import NORMALIZATION_KEY +from stream_alert.shared.config import load_config from stream_alert.shared.rule import disable, matcher, Matcher, rule, Rule from tests.unit.stream_alert_rule_processor.test_helpers import ( diff --git a/tests/unit/stream_alert_rule_processor/test_threat_intel.py b/tests/unit/stream_alert_rule_processor/test_threat_intel.py index dcc146ac8..a89d62d78 100644 --- a/tests/unit/stream_alert_rule_processor/test_threat_intel.py +++ b/tests/unit/stream_alert_rule_processor/test_threat_intel.py @@ -24,8 +24,8 @@ raises, ) -from stream_alert.rule_processor.config import load_config from stream_alert.rule_processor.threat_intel import StreamThreatIntel, StreamIoc +from stream_alert.shared.config import load_config from tests.unit.stream_alert_rule_processor.test_helpers import ( MockDynamoDBClient, mock_normalized_records, From b67d3053feec08e069baeead102cb4080112a7fc Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:31:49 -0700 Subject: [PATCH 12/15] removing unnecessary logic related to config from CLI --- stream_alert_cli/outputs.py | 65 ----------------- stream_alert_cli/runner.py | 13 ++-- stream_alert_cli/test.py | 3 +- tests/unit/stream_alert_cli/test_outputs.py | 80 +-------------------- 4 files changed, 11 insertions(+), 150 deletions(-) diff --git a/stream_alert_cli/outputs.py b/stream_alert_cli/outputs.py index 52aab250e..a931ea7f9 100644 --- a/stream_alert_cli/outputs.py +++ b/stream_alert_cli/outputs.py @@ -14,64 +14,12 @@ limitations under the License. """ import json -import os import boto3 from botocore.exceptions import ClientError from stream_alert_cli.logger import LOGGER_CLI -OUTPUTS_CONFIG = 'outputs.json' - - -def load_outputs_config(conf_dir='conf'): - """Load the outputs configuration file from disk - - Args: - conf_dir (str): Directory to read outputs config from - - Returns: - dict: The output configuration settings - """ - with open(os.path.join(conf_dir, OUTPUTS_CONFIG)) as outputs: - try: - values = json.load(outputs) - except ValueError: - LOGGER_CLI.error( - 'The %s file could not be loaded into json', - OUTPUTS_CONFIG) - raise - - return values - - -def write_outputs_config(data, conf_dir='conf'): - """Write the outputs configuration file back to disk - - Args: - data (dict): Dictionary to be converted to json and written to disk - conf_dir (str): Directory to write outputs config to - """ - with open(os.path.join(conf_dir, OUTPUTS_CONFIG), 'w') as outputs: - json.dump(data, outputs, indent=2, separators=(',', ': '), sort_keys=True) - - -def load_config(props, service): - """Gets the outputs config from disk and checks if the output already exists - - Args: - props (OrderedDict): Contains various OutputProperty items - service (str): The service for which the user is adding a configuration - - Returns: - dict: If the output doesn't exist, return the configuration, otherwise return False - """ - config = load_outputs_config() - if output_exists(config, props, service): - return False - - return config - def encrypt_and_push_creds_to_s3(region, bucket, key, props, kms_key_alias): """Construct a dictionary of the credentials we want to encrypt and send to s3 @@ -162,16 +110,3 @@ def output_exists(config, props, service): return True return False - - -def update_outputs_config(config, updated_config, service): - """Updates and writes the outputs config back to disk - - Args: - config (dict): The loaded configuration as a dictionary - updated_config: The updated configuration for this service. this could - be a list, dictionary, etc depending on how this services stores config info - service (str): The service whose configuration is being updated - """ - config[service] = updated_config - write_outputs_config(config) diff --git a/stream_alert_cli/runner.py b/stream_alert_cli/runner.py index eab5665fe..84ef4efcb 100644 --- a/stream_alert_cli/runner.py +++ b/stream_alert_cli/runner.py @@ -136,11 +136,11 @@ def configure_output(options): props[name] = prop._replace( value=user_input(prop.description, prop.mask_input, prop.input_restrictions)) + output_config = CONFIG['outputs'] service = output.__service__ - config = config_outputs.load_config(props, service) - # An empty config here means this configuration already exists, - # so we can ask for user input again for a unique configuration - if config is False: + + # If it exists already, ask for user input again for a unique configuration + if config_outputs.output_exists(output_config, props, service): return configure_output(options) secrets_bucket = '{}.streamalert.secrets'.format(prefix) @@ -150,8 +150,9 @@ def configure_output(options): # then update the local output configuration with properties if config_outputs.encrypt_and_push_creds_to_s3(region, secrets_bucket, secrets_key, props, kms_key_alias): - updated_config = output.format_output_config(config, props) - config_outputs.update_outputs_config(config, updated_config, service) + updated_config = output.format_output_config(output_config, props) + output_config[service] = updated_config + CONFIG.write() LOGGER_CLI.info('Successfully saved \'%s\' output configuration for service \'%s\'', props['descriptor'].value, options.service) diff --git a/stream_alert_cli/test.py b/stream_alert_cli/test.py index da330e6e1..8439fa3fd 100644 --- a/stream_alert_cli/test.py +++ b/stream_alert_cli/test.py @@ -41,7 +41,6 @@ SuppressNoise ) -from stream_alert_cli.outputs import load_outputs_config TEST_EVENTS_DIR = 'tests/integration/rules' COLOR_RED = '\033[0;31;1m' @@ -655,7 +654,7 @@ def __init__(self, config, context): self.kms_alias = 'alias/stream_alert_secrets_test' self.secrets_bucket = 'test.streamalert.secrets' self.outputs_config = resources.merge_required_outputs( - load_outputs_config(), + config['outputs'], 'test-prefix' ) self.region = config['global']['account']['region'] diff --git a/tests/unit/stream_alert_cli/test_outputs.py b/tests/unit/stream_alert_cli/test_outputs.py index cb2f33d60..f14738aa6 100644 --- a/tests/unit/stream_alert_cli/test_outputs.py +++ b/tests/unit/stream_alert_cli/test_outputs.py @@ -15,70 +15,12 @@ """ import boto3 from botocore.exceptions import ClientError -from mock import mock_open, patch +from mock import patch from moto import mock_kms, mock_s3 -from nose.tools import assert_false, assert_list_equal, assert_true, raises +from nose.tools import assert_true, raises from stream_alert.alert_processor.outputs.output_base import OutputProperty -from stream_alert_cli.outputs import ( - encrypt_and_push_creds_to_s3, - load_config, - load_outputs_config, - update_outputs_config, - write_outputs_config -) - - -def test_load_output_config(): - """CLI - Outputs - Load outputs configuration""" - config = load_outputs_config('tests/unit/conf') - loaded_config_keys = sorted(config.keys()) - - expected_config_keys = [ - 'aws-firehose', - 'aws-lambda', - 'aws-s3', - 'aws-sns', - 'aws-sqs', - 'pagerduty', - 'phantom', - 'slack' - ] - - assert_list_equal(loaded_config_keys, expected_config_keys) - - -@raises(ValueError) -def test_load_output_config_error(): - """CLI - Outputs - Load outputs configuration - exception""" - mock = mock_open(read_data='non-json string that will raise an exception') - with patch('__builtin__.open', mock): - load_outputs_config() - - -@patch('json.dump') -def test_write_outputs_config(json_mock): - """CLI - Outputs - Write outputs configuration""" - with patch('__builtin__.open', new_callable=mock_open()) as mocker: - data = {'test': 'values', 'to': 'write'} - write_outputs_config(data) - json_mock.assert_called_with(data, mocker.return_value.__enter__.return_value, - indent=2, separators=(',', ': '), sort_keys=True) - - -@patch('stream_alert_cli.outputs.load_outputs_config') -def test_load_config(method_mock): - """CLI - Outputs - Load config - check for existing output""" - # Patch the return value of the load_outputs_config method to return - # the unit testing outputs configuration - method_mock.return_value = load_outputs_config(conf_dir="tests/unit/conf") - props = { - 'descriptor': OutputProperty( - 'short description', - 'unit_test_lambda')} - loaded = load_config(props, 'aws-lambda') - - assert_false(loaded) +from stream_alert_cli.outputs import encrypt_and_push_creds_to_s3 @patch('stream_alert_cli.outputs.encrypt_and_push_creds_to_s3') @@ -134,19 +76,3 @@ def test_encrypt_and_push_creds_to_s3_kms_failure(log_mock, boto_mock): encrypt_and_push_creds_to_s3('us-east-1', 'bucket', 'key', props, 'test_alias') log_mock.assert_called_with('An error occurred during credential encryption') - - -@patch('json.dump') -def test_update_outputs_config(json_mock): - """CLI - Outputs - Update outputs config""" - with patch('__builtin__.open', new_callable=mock_open()) as mocker: - service = 'mock_service' - original_config = {service: ['value01', 'value02']} - new_config_values = ['value01', 'value02', 'value03'] - - update_outputs_config(original_config, new_config_values, service) - - expected_value = {'mock_service': ['value01', 'value02', 'value03']} - - json_mock.assert_called_with(expected_value, mocker.return_value.__enter__.return_value, - indent=2, separators=(',', ': '), sort_keys=True) From 7c255896e47758b9cd19185df84e7a87a6522e23 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:32:55 -0700 Subject: [PATCH 13/15] updating the CLIConfig class to use the shared config loading logic --- conf/global.json | 8 +- conf/lambda.json | 2 +- stream_alert/alert_processor/main.py | 1 - stream_alert_cli/config.py | 89 ++++--------------- .../terraform/test_generate.py | 2 - .../terraform/test_s3_events.py | 2 - 6 files changed, 24 insertions(+), 80 deletions(-) diff --git a/conf/global.json b/conf/global.json index 9280d5f86..e606357d6 100644 --- a/conf/global.json +++ b/conf/global.json @@ -6,11 +6,11 @@ "region": "us-east-1" }, "general": { - "rule_locations": [ - "rules" - ], "matcher_locations": [ "matchers" + ], + "rule_locations": [ + "rules" ] }, "infrastructure": { @@ -39,4 +39,4 @@ "tfstate_s3_key": "stream_alert_state/terraform.tfstate", "tfvars": "terraform.tfvars" } -} +} \ No newline at end of file diff --git a/conf/lambda.json b/conf/lambda.json index c2f0933ea..4871b980e 100644 --- a/conf/lambda.json +++ b/conf/lambda.json @@ -87,4 +87,4 @@ "source_current_hash": "", "source_object_key": "" } -} +} \ No newline at end of file diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 1c0fb317d..a641c1392 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -14,7 +14,6 @@ limitations under the License. """ from __future__ import absolute_import # Suppresses RuntimeWarning import error in Lambda -import json import os from stream_alert.alert_processor import LOGGER diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index 5f7160cb5..e22988426 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -13,13 +13,12 @@ See the License for the specific language governing permissions and limitations under the License. """ -from collections import OrderedDict import json import os import re from app_integrations.apps.app_base import StreamAlertApp -from stream_alert.shared import metrics +from stream_alert.shared import config, metrics from stream_alert_cli.helpers import continue_prompt from stream_alert_cli.logger import LOGGER_CLI from stream_alert_cli.apps import save_app_auth_info @@ -31,12 +30,11 @@ class CLIConfigError(Exception): class CLIConfig(object): """A class to load, modify, and display the StreamAlertCLI Config""" - DEFAULT_CONFIG_PATH = 'conf' + DEFAULT_CONFIG_PATH = 'conf/' - def __init__(self, **kwargs): - self.config_path = kwargs.get('config_path', self.DEFAULT_CONFIG_PATH) - self.config = {'clusters': {}} - self.load() + def __init__(self, config_path=DEFAULT_CONFIG_PATH): + self.config_path = config_path + self.config = config.load_config(config_path) def __repr__(self): return str(self.config) @@ -546,74 +544,25 @@ def add_threat_intel_downloader(self, ti_downloader_info): self.write() return True - def _config_reader(self, key, file_path, **kwargs): - """Read a given file into a config key - - Args: - key (str): The key in the config dictionary to place the loaded - config file. - file_path (str): The location on disk to load the config file. - - Keyword Args: - cluster_file (bool): If the file to load is a cluster file. - """ - # This accounts for non files passed in, such as a - # directory from os.listdir() - if not os.path.isfile(file_path): - return - - with open(file_path) as data: - try: - if kwargs.get('cluster_file', False): - self.config['clusters'][key] = json.load(data) - else: - # For certain log types (csv), the order of the schema - # must be retained. By loading as an OrderedDict, - # the configuration is guaranteed to keep its order. - if key == 'logs': - self.config[key] = json.load(data, object_pairs_hook=OrderedDict) - else: - self.config[key] = json.load(data) - except ValueError: - raise CLIConfigError('[Config Error]: {} is not valid JSON'.format(file_path)) - @staticmethod - def _config_writer(config, path, **kwargs): + def _config_writer(path, data, sort=True): with open(path, 'r+') as conf_file: - json.dump( - config, - conf_file, - indent=2, - separators=(',', ': '), - sort_keys=kwargs.get('sort_keys', True)) + json.dump(data, conf_file, indent=2, separators=(',', ': '), sort_keys=sort) conf_file.truncate() - def load(self): - """Load all files found under conf, including cluster configurations""" - # Load configuration files - config_files = [conf for conf in os.listdir(self.config_path) if conf.endswith('.json')] - for config_file in config_files: - config_key = os.path.splitext(config_file)[0] - file_path = os.path.join(self.config_path, config_file) - self._config_reader(config_key, file_path) - - # Load cluster files - for cluster_file in os.listdir(os.path.join(self.config_path, 'clusters')): - config_key = os.path.splitext(cluster_file)[0] - file_path = os.path.join(self.config_path, 'clusters', cluster_file) - self._config_reader(config_key, file_path, cluster_file=True) - def write(self): """Write the current config in memory to disk""" # Write loaded configuration files - for config_key in [key for key in self.config if key != 'clusters']: - file_path = os.path.join(self.config_path, '{}.json'.format(config_key)) - if config_key == 'logs': - self._config_writer(self.config[config_key], file_path, sort_keys=False) + def format_path(parts): + return '{}.json'.format(os.path.join(*parts)) + + for config_key in self.config: + path_parts = [self.config_path, config_key] + if config_key == 'clusters': + # Write loaded cluster files + for cluster_key in self.config['clusters']: + parts = path_parts + [cluster_key] + self._config_writer(format_path(parts), self.config['clusters'][cluster_key]) else: - self._config_writer(self.config[config_key], file_path) - - # Write loaded cluster files - for cluster_key in self.config['clusters']: - file_path = os.path.join(self.config_path, 'clusters', '{}.json'.format(cluster_key)) - self._config_writer(self.config['clusters'][cluster_key], file_path) + sort = config_key != 'logs' # logs.json should not be sorted + self._config_writer(format_path(path_parts), self.config[config_key], sort) diff --git a/tests/unit/stream_alert_cli/terraform/test_generate.py b/tests/unit/stream_alert_cli/terraform/test_generate.py index affd95ef2..f69f41548 100644 --- a/tests/unit/stream_alert_cli/terraform/test_generate.py +++ b/tests/unit/stream_alert_cli/terraform/test_generate.py @@ -314,8 +314,6 @@ def test_generate_cloudtrail_basic(self): 'enabled': True } result = cloudtrail.generate_cloudtrail(cluster_name, self.cluster_dict, self.config) - # Reload the config - self.config.load() assert_true(result) assert_equal(set(self.config['clusters']['advanced']['modules']['cloudtrail'].keys()), diff --git a/tests/unit/stream_alert_cli/terraform/test_s3_events.py b/tests/unit/stream_alert_cli/terraform/test_s3_events.py index d03aa8378..6e0a9e51b 100644 --- a/tests/unit/stream_alert_cli/terraform/test_s3_events.py +++ b/tests/unit/stream_alert_cli/terraform/test_s3_events.py @@ -29,8 +29,6 @@ def test_generate_s3_events_legacy(): 's3_bucket_id': 'unit-test-bucket.legacy.data' } result = s3_events.generate_s3_events('test', cluster_dict, CONFIG) - # Reload the config - CONFIG.load() assert_true(result) assert_equal(CONFIG['clusters']['test']['modules']['s3_events'], From b29a6eb8fdf1c16fdd437a8243bc9d2096dfb97a Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 25 Apr 2018 22:40:12 -0700 Subject: [PATCH 14/15] fixing test failures --- stream_alert/shared/config.py | 2 -- .../test_threat_intel.py | 22 +++++-------------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/stream_alert/shared/config.py b/stream_alert/shared/config.py index 24f9eedeb..3dfc08551 100644 --- a/stream_alert/shared/config.py +++ b/stream_alert/shared/config.py @@ -72,7 +72,6 @@ def load_config(conf_dir='conf/', **kwargs): exclude (set): Names of config files or folders that should not be loaded include (set): Names of specific config files to only load validate (bool): Validate aspects of the config to check for user error - """ default_files = {file for file in os.listdir(conf_dir) if file.endswith('.json')} conf_files = kwargs.get('include', default_files).copy() @@ -125,7 +124,6 @@ def _validate_config(config): """ # Check the log declarations if 'logs' in config: - for log, attrs in config['logs'].iteritems(): if 'schema' not in attrs: raise ConfigError('The \'schema\' is missing for {}'.format(log)) diff --git a/tests/unit/stream_alert_rule_processor/test_threat_intel.py b/tests/unit/stream_alert_rule_processor/test_threat_intel.py index a89d62d78..0e3c4724a 100644 --- a/tests/unit/stream_alert_rule_processor/test_threat_intel.py +++ b/tests/unit/stream_alert_rule_processor/test_threat_intel.py @@ -513,22 +513,12 @@ def test_load_from_config(self): assert_equal(StreamThreatIntel.normalized_type_mapping(), expected_result) def test_load_from_config_with_cluster_env(self): - """Threat Intel - Test load_from_config to read cluster env variable""" - with patch.dict('os.environ', {'CLUSTER': 'advanced'}): - config = load_config('tests/unit/conf') - config['global']['threat_intel']['enabled'] = True - threat_intel = StreamThreatIntel.load_from_config(config) - assert_is_instance(threat_intel, StreamThreatIntel) - assert_equal(config['clusters'].keys(), ['advanced']) - - def test_load_from_config_with_cluster_env_2(self): - """Threat Intel - Test load_from_config with threat intel disabled in cluster""" - with patch.dict('os.environ', {'CLUSTER': 'test'}): - config = load_config('tests/unit/conf') - config['global']['threat_intel']['enabled'] = True - threat_intel = StreamThreatIntel.load_from_config(config) - assert_false(isinstance(threat_intel, StreamThreatIntel)) - assert_equal(config['clusters'].keys(), ['test']) + """Threat Intel - Test load_from_config to read cluster""" + config = load_config('tests/unit/conf') + config['global']['threat_intel']['enabled'] = True + threat_intel = StreamThreatIntel.load_from_config(config) + assert_is_instance(threat_intel, StreamThreatIntel) + assert_true('advanced' in config['clusters'].keys()) def test_process_types_config(self): """Threat Intel - Test process_types_config method""" From f2e1419d9f89f085f16f76a4de851d2b8433d398 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 26 Apr 2018 16:31:36 -0700 Subject: [PATCH 15/15] addressing pr feedback --- stream_alert/shared/config.py | 68 ++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/stream_alert/shared/config.py b/stream_alert/shared/config.py index 3dfc08551..e66bddb59 100644 --- a/stream_alert/shared/config.py +++ b/stream_alert/shared/config.py @@ -32,7 +32,7 @@ def parse_lambda_arn(function_arn): Example: arn:aws:lambda:aws-region:acct-id:function:stream_alert:production - Arguments: + Args: function_arn (str): The AWS Lambda function ARN Returns: @@ -53,7 +53,7 @@ def parse_lambda_arn(function_arn): } -def load_config(conf_dir='conf/', **kwargs): +def load_config(conf_dir='conf/', exclude=None, include=None, validate=False): """Load the configuration for StreamAlert. All configuration files live in the `conf` directory in JSON format. @@ -65,46 +65,76 @@ def load_config(conf_dir='conf/', **kwargs): key denotes the name of the log type, and includes 'keys' used to match rules to log fields. - Arguments: + Args: conf_dir (str): [optional] Path from which to load the config + exclude (set): [optional] Names of config files or folders that should not be loaded + include (set): [optional] Names of specific config files to only load + validate (bool): [optional] Validate aspects of the config to check for user error - Keyword Arguemnts: - exclude (set): Names of config files or folders that should not be loaded - include (set): Names of specific config files to only load - validate (bool): Validate aspects of the config to check for user error + Raises: + ConfigError: Raised if errors occur with configuration file loading + + Returns: + dict: Loaded configuration in dictionary form. Example: + { + 'clusters': { + 'prod': + }, + 'global': , + 'lambda': , + 'logs': , + 'outputs': , + 'sources': , + 'types': , + } """ default_files = {file for file in os.listdir(conf_dir) if file.endswith('.json')} - conf_files = kwargs.get('include', default_files).copy() + conf_files = (include or default_files).copy() include_clusters = 'clusters' in conf_files conf_files.intersection_update(default_files) - if not (conf_files or include_clusters): - raise ConfigError('No config files to load') - - exclusions = kwargs.get('exclude', set()) + exclusions = exclude or set() conf_files = conf_files.difference(exclusions) + if not (conf_files or include_clusters): + available_files = ', '.join("'{}'".format(name) for name in sorted(default_files)) + raise ConfigError('No config files to load. This is likely due the misuse of ' + 'the \'include\' or \'exclude\' keyword arguments. Available ' + 'files are: {}, and clusters'.format(available_files)) + config = defaultdict(dict) for name in conf_files: path = os.path.join(conf_dir, name) # we use object_pairs_hook=OrderdDict to preserve schema order for CSV/KV log types - config[os.path.splitext(name)[0]] = _load_json_file(path, True) + config[os.path.splitext(name)[0]] = _load_json_file(path, name == 'logs.json') # Load the configs for clusters if it is not excluded - if 'clusters' not in exclusions and not kwargs.get('include') or include_clusters: + if 'clusters' not in exclusions and not include or include_clusters: clusters = {file for file in os.listdir(os.path.join(conf_dir, 'clusters')) if file.endswith('.json')} for cluster in clusters: cluster_path = os.path.join(conf_dir, 'clusters', cluster) config['clusters'][os.path.splitext(cluster)[0]] = _load_json_file(cluster_path) - if kwargs.get('validate'): + if validate: _validate_config(config) return config def _load_json_file(path, ordered=False): - """Helper to return the loaded json from a given path""" + """Helper to return the loaded json from a given path + + Args: + path (str): Relative path to config file being loaded + ordered (bool): [optional] Boolean that indicates if the loaded JSON + file should have its order maintained an object_pairs_hook=OrderdDict + + Returns: + dict: The loaded contents of the JSON file specified by path + + Raises: + ConfigError: Raised if any ValueErrors occur during json.load(...) + """ kwargs = {'object_pairs_hook': OrderedDict if ordered else None} with open(path) as data: try: @@ -121,6 +151,12 @@ def _validate_config(config): Checks for `sources.json` - the sources contains either kinesis or s3 keys - each sources has a list of logs declared + + Args: + config (dict): The loaded configuration dictionary + + Raises: + ConfigError: Raised if any config validation errors occur """ # Check the log declarations if 'logs' in config: