diff --git a/streamalert_cli/test/event_file.py b/streamalert_cli/test/event_file.py index cc0564d48..a9475bbc2 100644 --- a/streamalert_cli/test/event_file.py +++ b/streamalert_cli/test/event_file.py @@ -44,6 +44,10 @@ def __str__(self): return '\n'.join(str(item) for item in output) + @property + def path(self): + return self._full_path + @property def should_print(self): return any(not result.suppressed for result in self._results) diff --git a/streamalert_cli/test/handler.py b/streamalert_cli/test/handler.py index 3cb307022..545daae18 100644 --- a/streamalert_cli/test/handler.py +++ b/streamalert_cli/test/handler.py @@ -25,6 +25,7 @@ from streamalert.classifier import classifier from streamalert.rules_engine import rules_engine from streamalert.shared import rule +from streamalert.shared.config import ConfigError from streamalert.shared.logger import get_logger from streamalert.shared.stats import RuleStatisticTracker from streamalert_cli.helpers import check_credentials @@ -229,7 +230,6 @@ def __init__(self, options, config): self._failed = 0 prefix = self._config['global']['account']['prefix'] env = { - 'CLUSTER': 'prod', 'STREAMALERT_PREFIX': prefix, 'AWS_ACCOUNT_ID': self._config['global']['account']['aws_account_id'], 'ALERTS_TABLE': '{}_streamalert_alerts'.format(prefix), @@ -238,10 +238,7 @@ def __init__(self, options, config): if 'stats' in options and options.stats: env['STREAMALERT_TRACK_RULE_STATS'] = '1' - patch.dict( - os.environ, - env - ).start() + patch.dict(os.environ, env).start() @staticmethod def _run_classification(record): @@ -328,13 +325,43 @@ def _process_test_file(self, test_file_path): # Iterate over the individual test events in the file event_file = TestEventFile(test_file_path) for event in event_file.process_file(self._config, self._verbose, self._testing_rules): - # Set the cluster in the env since this is used from within the - # classifier to load the proper cluster config + # Each test event should be tied to a cluster, via the configured data_sources + # Reset the CLUSTER env var for each test, since it could differ between each event + # This env var is used from within the classifier to load the proper cluster config + if 'CLUSTER' in os.environ: + del os.environ['CLUSTER'] + for cluster_name, cluster_value in self._config['clusters'].items(): - for service in cluster_value['data_sources'].values(): - if event.source in service: - os.environ['CLUSTER'] = cluster_name - break + if event.service not in cluster_value['data_sources']: + LOGGER.debug( + 'Cluster "%s" does not have service "%s" configured as a data source', + cluster_name, + event.service + ) + continue + + sources = set(cluster_value['data_sources'][event.service]) + if event.source not in sources: + LOGGER.debug( + 'Cluster "%s" does not have the source "%s" configured as a data source ' + 'for service "%s"', + cluster_name, + event.source, + event.service + ) + continue + + # If we got here, then this cluster is actually configured for this data source + os.environ['CLUSTER'] = cluster_name + break + + # A misconfigured test event and/or cluster config can cause this to be unset + if 'CLUSTER' not in os.environ: + error = ( + 'Test event\'s "service" ({}) and "source" ({}) are not defined within ' + 'the "data_sources" of any configured clusters: {}:{}' + ).format(event.service, event.source, event_file.path, event.index) + raise ConfigError(error) classifier_result = self._run_classification(event.record) diff --git a/streamalert_cli/test/results.py b/streamalert_cli/test/results.py index bda1ca67a..58f9f8ec3 100644 --- a/streamalert_cli/test/results.py +++ b/streamalert_cli/test/results.py @@ -173,6 +173,10 @@ def __str__(self): __repr__ = __str__ + @property + def index(self): + return self._index + @property def _disabled_rules(self): return sorted(set(self.trigger_rules).intersection( diff --git a/tests/unit/streamalert_cli/test/helpers.py b/tests/unit/streamalert_cli/test/helpers.py index da6162fd9..6541f4976 100644 --- a/tests/unit/streamalert_cli/test/helpers.py +++ b/tests/unit/streamalert_cli/test/helpers.py @@ -19,8 +19,8 @@ @nottest -def basic_test_file_json(): - return json.dumps([basic_test_event_data()]) +def basic_test_file_json(**kwargs): + return json.dumps([basic_test_event_data(**kwargs)]) @nottest diff --git a/tests/unit/streamalert_cli/test/test_handler.py b/tests/unit/streamalert_cli/test/test_handler.py new file mode 100644 index 000000000..d00ebe883 --- /dev/null +++ b/tests/unit/streamalert_cli/test/test_handler.py @@ -0,0 +1,130 @@ +""" +Copyright 2017-present Airbnb, Inc. + +Licensed under the Apache License, Version 2.0 (the 'License'); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an 'AS IS' BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from io import StringIO +import os + +import mock +from mock import patch, MagicMock, Mock +from nose.tools import assert_equal, assert_raises, nottest +from pyfakefs import fake_filesystem_unittest + +from streamalert.shared.config import load_config +from streamalert.shared.exceptions import ConfigError +from streamalert_cli.config import CLIConfig +from streamalert_cli.test.handler import TestRunner +from tests.unit.streamalert_cli.test.helpers import basic_test_file_json + +# Keep nose from trying to treat this as a test +TestRunner = nottest(TestRunner) + + +class TestTestRunner(fake_filesystem_unittest.TestCase): + """Test the TestEventFile class""" + # pylint: disable=protected-access + + TEST_CONFIG_PATH = 'tests/unit/conf' + _DEFAULT_EVENT_PATH = 'rules/community/unit_test/file.json' + + def setUp(self): + cli_config = CLIConfig(config_path='tests/unit/conf') + with patch('streamalert.rules_engine.rules_engine.load_config', + Mock(return_value=load_config(self.TEST_CONFIG_PATH))): + self.runner = TestRunner(MagicMock(), cli_config) + + self.setUpPyfakefs() + + @patch('logging.Logger.debug') + def test_process_test_file_bad_service(self, log_mock): + """StreamAlert CLI - TestRunner Process Test File, Misconfigured Service""" + self.fs.create_file( + self._DEFAULT_EVENT_PATH, + contents=basic_test_file_json( + log='unit_test_simple_log', + source='unit_test_default_stream', + service='s3' # s3 here is a misconfiguration, should be kinesis + ) + ) + + assert_raises(ConfigError, self.runner._process_test_file, self._DEFAULT_EVENT_PATH) + log_mock.assert_has_calls([ + mock.call( + 'Cluster "%s" does not have service "%s" configured as a data source', + 'advanced', + 's3' + ), + mock.call( + 'Cluster "%s" does not have service "%s" configured as a data source', + 'test', + 's3' + ), + mock.call( + 'Cluster "%s" does not have service "%s" configured as a data source', + 'trusted', + 's3' + ) + ], any_order=True) + + @patch('logging.Logger.debug') + def test_process_test_file_bad_source(self, log_mock): + """StreamAlert CLI - TestRunner Process Test File, Misconfigured Source""" + self.fs.create_file( + self._DEFAULT_EVENT_PATH, + contents=basic_test_file_json( + log='unit_test_simple_log', + source='nonexistent_source', # invalid source here + service='kinesis' + ) + ) + + assert_raises(ConfigError, self.runner._process_test_file, self._DEFAULT_EVENT_PATH) + log_mock.assert_has_calls([ + mock.call( + 'Cluster "%s" does not have service "%s" configured as a data source', + 'advanced', + 'kinesis' + ), + mock.call( + 'Cluster "%s" does not have service "%s" configured as a data source', + 'trusted', + 'kinesis' + ), + mock.call( + 'Cluster "%s" does not have the source "%s" configured as a data source ' + 'for service "%s"', + 'test', + 'nonexistent_source', + 'kinesis' + ), + ], any_order=True) + + @patch('sys.stdout', new=StringIO()) # patch stdout to suppress integration test result + def test_process_test_file(self): + """StreamAlert CLI - TestRunner Process Test File""" + self.fs.create_file( + self._DEFAULT_EVENT_PATH, + contents=basic_test_file_json( + log='unit_test_simple_log', + source='unit_test_default_stream', # valid source + service='kinesis' # valid service + ) + ) + self.fs.add_real_directory(self.TEST_CONFIG_PATH) + with patch('streamalert.classifier.classifier.config.load_config', + Mock(return_value=load_config(self.TEST_CONFIG_PATH))): + self.runner._process_test_file(self._DEFAULT_EVENT_PATH) + + # The CLUSTER env var should be properly deduced and set now + assert_equal(os.environ['CLUSTER'], 'test')