From 16c624b534dd7b8dacd668dc94cc46f821294ee2 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 16 Jun 2017 17:52:30 -0700 Subject: [PATCH 01/14] [lambda][rule] removing sns dependency from the rule processor in favor of direct lambda invocation to the alert processor --- stream_alert/rule_processor/sink.py | 135 +++++++++++----------------- 1 file changed, 51 insertions(+), 84 deletions(-) diff --git a/stream_alert/rule_processor/sink.py b/stream_alert/rule_processor/sink.py index 488cee5e2..e2ea416ab 100644 --- a/stream_alert/rule_processor/sink.py +++ b/stream_alert/rule_processor/sink.py @@ -13,7 +13,6 @@ See the License for the specific language governing permissions and limitations under the License. ''' - import json import logging import sys @@ -21,30 +20,34 @@ import boto3 from botocore.exceptions import ClientError -_SNS_MAX_SIZE = (256*1024) - logging.basicConfig() LOGGER = logging.getLogger('StreamAlert') -def json_dump(sns_dict, indent_value=None): +def _json_dump(alert, indent_value=None): def json_dict_serializer(obj): """Helper method for marshalling dictionary objects to JSON""" return obj.__dict__ try: - return json.dumps(sns_dict, indent=indent_value, default=json_dict_serializer) + return json.dumps(alert, indent=indent_value, default=json_dict_serializer) except AttributeError as err: LOGGER.error('An error occurred while dumping object to JSON: %s', err) return "" -class SNSMessageSizeError(Exception): - pass class StreamSink(object): + """StreamSink class is used for sending actual alerts to the alert processor""" def __init__(self, env): + """StreamSink initializer + + Args: + env [dict]: loaded dictionary containing environment information + """ self.env = env - self.client_sns = boto3.client('sns', region_name=self.env['lambda_region']) - self.topic = self._get_sns_topic_arn() + self.client_lambda = boto3.client('lambda', + region_name=self.env['lambda_region']) + self.function = self.env['lambda_function_name'].replace( + '_streamalert_rule_processor', '_streamalert_alert_processor') def sink(self, alerts): """Sink triggered alerts from the StreamRules engine. @@ -52,82 +55,46 @@ def sink(self, alerts): Args: alerts [list]: a list of dictionaries representating json alerts - Sends a message to SNS with the following JSON format: - {default: [ - { - 'record': record, - 'metadata': { - 'rule_name': rule.rule_name, - 'rule_description': rule.rule_function.__doc__, - 'log': str(payload.log_source), - 'outputs': rule.outputs, - 'type': payload.type, - 'source': { - 'service': payload.service, - 'entity': payload.entity - } + Sends a message to the alert processor with the following JSON format: + { + "record": record, + "metadata": { + "rule_name": rule.rule_name, + "rule_description": rule.rule_function.__doc__, + "log": str(payload.log_source), + "outputs": rule.outputs, + "type": payload.type, + "source": { + "service": payload.service, + "entity": payload.entity } } - ]} + } """ for alert in alerts: - sns_dict = {'default': alert} - self.publish_message(json_dump(sns_dict)) - - def _get_sns_topic_arn(self): - """Return a properly formatted SNS ARN. - - Args: - region: Which AWS region the SNS topic exists in. - topic: The name of the SNS topic. - """ - topic = self.env['lambda_function_name'].replace('_streamalert_rule_processor', - '_streamalerts') - - return 'arn:aws:sns:{region}:{account_id}:{topic}'.format( - region=self.env['lambda_region'], - account_id=self.env['account_id'], - topic=topic - ) - - @staticmethod - def _sns_message_size_check(message): - """Verify the SNS message is less than or equal to 256KB (SNS Limit) - Args: - message: A JSON string containing an alert to send to SNS. - - Returns: - Boolean result of if the message is within the size constraint - """ - message_size = sys.getsizeof(message) - return 0 < message_size <= _SNS_MAX_SIZE - - def publish_message(self, message): - """Emit a message to SNS. - - Args: - client: The boto3 client object. - message: A JSON string containing a serialized alert. - topic: The SNS topic ARN to send to. - """ - if not self._sns_message_size_check(message): - LOGGER.error('Cannot publish Alerts, message size is too big!') - raise SNSMessageSizeError('SNS message size is too big! (Max: 256KB)') - - try: - response = self.client_sns.publish( - TopicArn=self.topic, - Message=message, - Subject='StreamAlert Rules Triggered' - ) - except ClientError: - LOGGER.exception('An error occurred while publishing alert to sns') - return - - if response['ResponseMetadata']['HTTPStatusCode'] != 200: - LOGGER.error('Failed to publish message to sns topic: %s', self.topic.split(':')[-1]) - return - - if self.env['lambda_alias'] != 'development': - LOGGER.info('Published alert to %s', self.topic.split(':')[-1]) - LOGGER.info('SNS MessageID: %s', response['MessageId']) + data = _json_dump(alert) + + try: + response = self.client_lambda.invoke( + FunctionName=self.function, + InvocationType='Event', + Payload=data + ) + + except ClientError as err: + LOGGER.exception('An error occurred while sending alert to ' + '\'%s\'. Error is: %s. Alert: %s', + self.function, + err.response, + data) + return + + if response['ResponseMetadata']['HTTPStatusCode'] != 202: + LOGGER.error('Failed to send alert to \'%s\': %s', + self.function, data) + return + + if self.env['lambda_alias'] != 'development': + LOGGER.info('Sent alert to \'%s\' with Lambda request ID \'%s\'', + self.function, + response['ResponseMetadata']['RequestId']) From bc59d64ef3b297c54f323e09390a893429e49ea1 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 16 Jun 2017 18:03:44 -0700 Subject: [PATCH 02/14] [lambda][alert] updating alert processor side for rule processor sns changes --- stream_alert/alert_processor/main.py | 75 ++++++++++------------------ 1 file changed, 25 insertions(+), 50 deletions(-) diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 0cfa161c0..1af36a2d7 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -24,6 +24,7 @@ LOGGER = logging.getLogger('StreamAlertOutput') LOGGER.setLevel(logging.DEBUG) + def handler(event, context): """StreamAlert Alert Processor @@ -40,10 +41,8 @@ def handler(event, context): indicates if sending was successful and the second value is the output configuration info (ie - 'slack:sample_channel') """ - records = event.get('Records', []) - LOGGER.info('Running alert processor for %d records', len(records)) - - # A failure to load the config will log the error in load_output_config and return here + # A failure to load the config will log the error in load_output_config + # and return here config = _load_output_config() if not config: return @@ -51,57 +50,32 @@ def handler(event, context): region = context.invoked_function_arn.split(':')[3] function_name = context.function_name - status_values = [] - - for record in records: - sns_payload = record.get('Sns') - if not sns_payload: - continue - - sns_message = sns_payload['Message'] - try: - loaded_sns_message = json.loads(sns_message) - except ValueError as err: - LOGGER.error('An error occurred while decoding message to JSON: %s', err) - continue - - if not 'default' in loaded_sns_message: - # do not log for messages related to alarms - if not 'AlarmName' in loaded_sns_message: - LOGGER.error('Malformed SNS: %s', loaded_sns_message) - continue - - status_values.extend(run(loaded_sns_message, region, function_name, config)) + # Yield back the current status to the caller + for status in run(event, region, function_name, config): + yield status - # Return the current status back to the caller - return status_values -def run(loaded_sns_message, region, function_name, config): +def run(alert, region, function_name, config): """Send an Alert to its described outputs. Args: - loaded_sns_message [dict]: SNS message dictionary with the following structure: - - { - 'default': alert - } - - The alert is another dict with the following structure: - - { - 'record': record, - 'metadata': { - 'rule_name': rule.rule_name, - 'rule_description': rule.rule_function.__doc__, - 'log': str(payload.log_source), - 'outputs': rule.outputs, - 'type': payload.type, - 'source': { - 'service': payload.service, - 'entity': payload.entity + alert [dict]: dictionary representating an alert with the + following structure: + + { + 'record': record, + 'metadata': { + 'rule_name': rule.rule_name, + 'rule_description': rule.rule_function.__doc__, + 'log': str(payload.log_source), + 'outputs': rule.outputs, + 'type': payload.type, + 'source': { + 'service': payload.service, + 'entity': payload.entity + } } } - } region [string]: the AWS region being used function_name [string]: the name of the lambda function @@ -110,8 +84,7 @@ def run(loaded_sns_message, region, function_name, config): Returns: [generator] yields back dispatch status and name of the output to the handler """ - LOGGER.debug(loaded_sns_message) - alert = loaded_sns_message['default'] + LOGGER.debug('Sending alert to outputs:\n%s', json.dumps(alert, indent=2)) rule_name = alert['metadata']['rule_name'] # strip out unnecessary keys and sort @@ -154,6 +127,7 @@ def run(loaded_sns_message, region, function_name, config): # Yield back the result to the handler yield sent, output + def _sort_dict(unordered_dict): """Recursively sort a dictionary @@ -173,6 +147,7 @@ def _sort_dict(unordered_dict): return result + def _load_output_config(config_path='conf/outputs.json'): """Load the outputs configuration file from disk From 3d352e68c1fc1077bda72d410e0e60feb9d189f0 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Fri, 16 Jun 2017 18:12:18 -0700 Subject: [PATCH 03/14] [tests] updating all tests to go with previous changes --- stream_alert_cli/test.py | 14 ++++-- .../stream_alert_alert_processor/helpers.py | 50 +++++++++---------- .../stream_alert_alert_processor/test_main.py | 48 ++++++------------ .../test_outputs.py | 14 +++--- .../stream_alert_rule_processor/test_sink.py | 39 +++++---------- 5 files changed, 69 insertions(+), 96 deletions(-) diff --git a/stream_alert_cli/test.py b/stream_alert_cli/test.py index 7e630b4be..e214b8a33 100644 --- a/stream_alert_cli/test.py +++ b/stream_alert_cli/test.py @@ -337,11 +337,15 @@ def test_processor(self, alerts): # Set the logger level to info so its not too noisy StreamOutput.LOGGER.setLevel(logging.ERROR) for alert in alerts: - # Establish the mocked outputs if the context is being mocked - if self.context.mocked: - self.setup_outputs(alert) - event = {'Records': [{'Sns': {'Message': json.dumps({'default': alert})}}]} - for passed, output in StreamOutput.handler(event, self.context): + outputs = alert['metadata'].get('outputs', []) + self.setup_outputs(outputs, url_mock) + event = json.dumps(alert) + context = Mock() + context.invoked_function_arn = ( + 'arn:aws:lambda:us-east-1:0123456789012:' + 'function:streamalert_alert_processor:production') + context.function_name = 'test_streamalert_alert_processor' + for passed, output in StreamOutput.handler(event, context): status = status and passed service, descriptor = output.split(':') message = 'sending alert to \'{}\''.format(descriptor) diff --git a/test/unit/stream_alert_alert_processor/helpers.py b/test/unit/stream_alert_alert_processor/helpers.py index c01f6cf9f..0987eb17b 100644 --- a/test/unit/stream_alert_alert_processor/helpers.py +++ b/test/unit/stream_alert_alert_processor/helpers.py @@ -36,7 +36,7 @@ def _construct_event(count): event = {'Records': []} for index in range(count): event['Records'] = event['Records'] + \ - [{'Sns': {'Message': json.dumps(_get_sns_message(index))}}] + [{'Sns': {'Message': json.dumps(_get_alert(index))}}] return event @@ -76,33 +76,31 @@ def _get_random_alert(key_count, rule_name, omit_rule_desc=False): return alert -def _get_sns_message(index): +def _get_alert(index=0): return { - 'default': { - 'record': { - 'test_index': index, - 'compressed_size': '9982', - 'timestamp': '1496947381.18', - 'node_id': '1', - 'cb_server': 'cbserver', - 'size': '21504', - 'type': 'binarystore.file.added', - 'file_path': '/tmp/5DA/AD8/0F9AA55DA3BDE84B35656AD8911A22E1.zip', - 'md5': '0F9AA55DA3BDE84B35656AD8911A22E1' + 'record': { + 'test_index': index, + 'compressed_size': '9982', + 'timestamp': '1496947381.18', + 'node_id': '1', + 'cb_server': 'cbserver', + 'size': '21504', + 'type': 'binarystore.file.added', + 'file_path': '/tmp/5DA/AD8/0F9AA55DA3BDE84B35656AD8911A22E1.zip', + 'md5': '0F9AA55DA3BDE84B35656AD8911A22E1' + }, + 'metadata': { + 'log': 'carbonblack:binarystore.file.added', + 'rule_name': 'cb_binarystore_file_added', + 'outputs': [ + 'slack:unit_test_channel' + ], + 'source': { + 'service': 's3', + 'entity': 'corp-prefix.prod.cb.region' }, - 'metadata': { - 'log': 'carbonblack:binarystore.file.added', - 'rule_name': 'cb_binarystore_file_added', - 'outputs': [ - 'slack:unit_test_channel' - ], - 'source': { - 'service': 's3', - 'entity': 'corp-prefix.prod.cb.region' - }, - 'type': 'json', - 'rule_description': 'Info about this rule and what actions to take' - } + 'type': 'json', + 'rule_description': 'Info about this rule and what actions to take' } } diff --git a/test/unit/stream_alert_alert_processor/test_main.py b/test/unit/stream_alert_alert_processor/test_main.py index 81460e5cd..37550837c 100644 --- a/test/unit/stream_alert_alert_processor/test_main.py +++ b/test/unit/stream_alert_alert_processor/test_main.py @@ -34,42 +34,26 @@ from unit.stream_alert_alert_processor.helpers import _get_mock_context -@patch('logging.Logger.error') -def test_handler_malformed_message(log_mock): - """Main handler decode failure logging""" - # The @patch() decorator allows us to 'introspect' what occurs down the chain - # and verify the params a function was LAST called with. For instance, here - # we are checking the last call to `logging.Logger.error` and verifying that the - # function was called with two params, the first being 'Malformed SNS: %s' and - # the second being the dictionary contained within `message` - # This call should happen at stream_alert/alert_processor/main.py:62 - context = _get_mock_context() - message = {'not_default': {'record': {'size': '9982'}}} - event = {'Records': [{'Sns': {'Message': json.dumps(message)}}]} - handler(event, context) - - log_mock.assert_called_with('Malformed SNS: %s', message) - - -@patch('logging.Logger.error') -def test_handler_bad_message(log_mock): - """Main handler decode failure logging""" - context = _get_mock_context() - event = {'Records': [{'Sns': {'Message': 'this\nvalue\nshould\nfail\nto\ndecode'}}]} - handler(event, context) - - assert_equal(str(log_mock.call_args_list[0]), - str(call('An error occurred while decoding message to JSON: %s', - ValueError('No JSON object could be decoded',)))) - - @patch('stream_alert.alert_processor.main.run') def test_handler_run(run_mock): """Main handler `run` call params""" context = _get_mock_context() - message = {'default': {'record': {'size': '9982'}}} - event = {'Records': [{'Sns': {'Message': json.dumps(message)}}]} - handler(event, context) + message = { + 'record': {'value': 'data'}, + 'metadata': { + 'rule_name': 'rule_name', + 'rule_description': 'rule_description', + 'log': 'log_source', + 'outputs': ['rule:output01', 'rule:output02'], + 'type': 'payload.type', + 'source': { + 'service': 'payload.service', + 'entity': 'payload.entity' + } + } + } + for _ in handler(message, context): + pass # This test will load the actual config, so we should compare the # function call against the same config here. diff --git a/test/unit/stream_alert_alert_processor/test_outputs.py b/test/unit/stream_alert_alert_processor/test_outputs.py index b791623fa..eacdfa29f 100644 --- a/test/unit/stream_alert_alert_processor/test_outputs.py +++ b/test/unit/stream_alert_alert_processor/test_outputs.py @@ -46,7 +46,7 @@ from unit.stream_alert_alert_processor.helpers import ( _get_random_alert, - _get_sns_message, + _get_alert, _remove_temp_secrets ) @@ -128,7 +128,7 @@ def _setup_dispatch(self): put_mock_creds(output_name, creds, self.__dispatcher.secrets_bucket, REGION, KMS_ALIAS) - return _get_sns_message(0)['default'] + return _get_alert() def _teardown_dispatch(self): """Replace method with cached method""" @@ -214,7 +214,7 @@ def _setup_dispatch(self, url): put_mock_creds(output_name, creds, self.__dispatcher.secrets_bucket, REGION, KMS_ALIAS) - return _get_sns_message(0)['default'] + return _get_alert() @patch('logging.Logger.info') @patch('urllib2.urlopen') @@ -451,7 +451,7 @@ def _setup_dispatch(self): put_mock_creds(output_name, creds, self.__dispatcher.secrets_bucket, REGION, KMS_ALIAS) - return _get_sns_message(0)['default'] + return _get_alert() @patch('logging.Logger.info') @patch('urllib2.urlopen') @@ -566,7 +566,7 @@ def _setup_dispatch(self): bucket = CONFIG[self.__service][self.__descriptor] boto3.client('s3', region_name=REGION).create_bucket(Bucket=bucket) - return _get_sns_message(0)['default'] + return _get_alert() @patch('logging.Logger.info') @mock_s3 @@ -605,8 +605,8 @@ def test_locals(self): def _setup_dispatch(self): """Helper for setting up LambdaOutput dispatch""" function_name = CONFIG[self.__service][self.__descriptor] - create_lambda_function(function_name, REGION) - return _get_sns_message(0)['default'] + _create_lambda_function(function_name, REGION) + return _get_alert() @mock_lambda @patch('logging.Logger.info') diff --git a/test/unit/stream_alert_rule_processor/test_sink.py b/test/unit/stream_alert_rule_processor/test_sink.py index 0380bb7ac..996c21fb2 100644 --- a/test/unit/stream_alert_rule_processor/test_sink.py +++ b/test/unit/stream_alert_rule_processor/test_sink.py @@ -38,38 +38,24 @@ def teardown_class(cls): """Teardown the class after any methods""" cls.env = None - def test_sns_topic_arn(self): - """Sink SNS Messaging - Topic ARN""" - sinker = sink.StreamSink(self.env) - arn = sinker._get_sns_topic_arn() - assert_equal(arn, 'arn:aws:sns:us-east-1:123456789012:unittest_prod_streamalerts') - - def test_message_size_check(self): - """Sink SNS Messaging - Message Blob Size Check""" - sinker = sink.StreamSink(self.env) - passed = sinker._sns_message_size_check(get_payload(1000)) - assert_equal(passed, True) - passed = sinker._sns_message_size_check(get_payload((256*1024)+1)) - assert_equal(passed, False) - @staticmethod def test_json_from_dict(): """Sink SNS Messaging - Dictionary to JSON Marshalling""" # Create a dictionary with an empty alert list - sns_dict = {"default": {}} - json_message = sink.json_dump(sns_dict) + alert = {"test": "alert"} + json_message = sink._json_dump(alert) # Test empty dictionary - assert_equal(json_message, '{"default": {}}') + assert_equal(json_message, '{"test": "alert"}') # Create a dictionary with a single alert in the list - sns_dict = {"default": { - 'rule_name': "test_rule_01", + alert = { 'record': { 'record_data_key01_01': "record_data_value01_01", 'record_data_key02_01': "record_data_value02_01" }, 'metadata': { + 'rule_name': "test_rule_01", 'log': "payload_data_01", 'outputs': "rule.outputs_01", 'type': "payload_type_01", @@ -78,16 +64,17 @@ def test_json_from_dict(): 'entity': "payload_entity_01" } } - }} + } - json_message = sink.json_dump(sns_dict) + json_message = sink._json_dump(alert) # Test with single alert entry - assert_equal(json_message, '{"default": {"rule_name": "test_rule_01", ' \ - '"metadata": {"outputs": "rule.outputs_01", "type": "payload_type_01", ' \ - '"log": "payload_data_01", "source": {"service": "payload_service_01", ' \ - '"entity": "payload_entity_01"}}, "record": {"record_data_key02_01": ' \ - '"record_data_value02_01", "record_data_key01_01": "record_data_value01_01"}}}') + assert_equal(json_message, '{"record": {"record_data_key02_01": ' \ + '"record_data_value02_01", "record_data_key01_01": "record_data_value01_01"}, ' \ + '"metadata": {"source": {"service": "payload_service_01", ' \ + '"entity": "payload_entity_01"}, "rule_name": "test_rule_01", ' \ + '"type": "payload_type_01", "log": "payload_data_01", ' \ + '"outputs": "rule.outputs_01"}}') def get_payload(byte_size): """Returns a base64 encoded random payload of (roughly) byte_size length From 33694f6052adbf42f7e60faa0fe7ac0cfc9d37a3 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Tue, 11 Jul 2017 23:12:59 -0700 Subject: [PATCH 04/14] [lambda][alert] rebasing and addition of helper to validate alert. tests included --- stream_alert/alert_processor/helpers.py | 104 +++++++++++++++ stream_alert/alert_processor/main.py | 11 +- stream_alert_cli/test.py | 14 +- .../test_helpers.py | 121 ++++++++++++++++++ .../test_outputs.py | 4 +- 5 files changed, 238 insertions(+), 16 deletions(-) create mode 100644 stream_alert/alert_processor/helpers.py create mode 100644 test/unit/stream_alert_alert_processor/test_helpers.py diff --git a/stream_alert/alert_processor/helpers.py b/stream_alert/alert_processor/helpers.py new file mode 100644 index 000000000..69ccc1234 --- /dev/null +++ b/stream_alert/alert_processor/helpers.py @@ -0,0 +1,104 @@ +''' +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +import logging + +logging.basicConfig() +LOGGER = logging.getLogger('StreamAlertOutput') +LOGGER.setLevel(logging.DEBUG) + +def validate_alert(alert): + """Helper function to perform simple validatation of an alert's keys and structure + + Args: + alert [dict]: the alert record to test that should be in the form of a dict + + Returns: + [bool] a boolean value indicating whether or not the alert has the proper structure + """ + if not _validate_root(alert): + return False + + metadata_keys = {'log', 'rule_name', 'rule_description', 'type', 'source', 'outputs'} + if not set(alert['metadata'].keys()) == metadata_keys: + LOGGER.error('The value of the \'metadata\' key must be a map (dict) ' + 'that contains the following keys: %s', + ', '.join('\'{}\''.format(key) for key in metadata_keys)) + return False + + valid = True + for key in metadata_keys: + if key == 'source': + if not (isinstance(alert['metadata'][key], dict) and + set(alert['metadata'][key].keys()) == {'service', 'entity'}): + LOGGER.error('The value of the \'source\' key must be a map (dict) that ' + 'contains \'service\' and \'entity\' keys.') + valid = False + continue + + for entry in alert['metadata'][key].values(): + if not isinstance(entry, (str, unicode)): + LOGGER.error('The value of the \'%s\' key within \'%s\' must be ' + 'a string (str).', entry, key) + valid = False + continue + + elif key == 'outputs': + if not (isinstance(alert['metadata'][key], list) and + alert['metadata'][key]): + LOGGER.error( + 'The value of the \'outputs\' key must be an array (list) that ' + 'contains at least one configured output.') + valid = False + continue + + for entry in alert['metadata'][key]: + if not isinstance(entry, (str, unicode)): + LOGGER.error('The value of each entry in the \'outputs\' list ' + 'must be a string (str).') + valid = False + continue + + elif not isinstance(alert['metadata'][key], (str, unicode)): + LOGGER.error('The value of the \'%s\' key must be a string (str), not %s', + key, type(alert['metadata'][key])) + valid = False + continue + + return valid + +def _validate_root(alert): + """Private helper function to validate the root keys on an alert + + Args: + alert [dict]: the alert record to test that should be in the form of a dict + + Returns: + [bool] a boolean value indicating whether or not the expected root keys in + the alert exist and have the proper values + """ + if not (isinstance(alert, dict) and + set(alert.keys()) == {'record', 'metadata'}): + LOGGER.error('The alert must be a map (dict) that contains \'record\' ' + 'and \'metadata\' keys.') + return False + + if not (isinstance(alert['record'], dict) and + isinstance(alert['metadata'], dict)): + LOGGER.error('The value of both the \'record\' and \'metadata\' keys ' + 'must be a map (dict).') + return False + + return True diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 1af36a2d7..87b86e865 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -18,6 +18,7 @@ from collections import OrderedDict +from stream_alert.alert_processor.helpers import validate_alert from stream_alert.alert_processor.outputs import get_output_dispatcher logging.basicConfig() @@ -50,10 +51,8 @@ def handler(event, context): region = context.invoked_function_arn.split(':')[3] function_name = context.function_name - # Yield back the current status to the caller - for status in run(event, region, function_name, config): - yield status - + # Return the current list of statuses back to the caller + return [status for status in run(event, region, function_name, config)] def run(alert, region, function_name, config): """Send an Alert to its described outputs. @@ -84,6 +83,10 @@ def run(alert, region, function_name, config): Returns: [generator] yields back dispatch status and name of the output to the handler """ + if not validate_alert(alert): + LOGGER.error('Invalid alert:\n%s', json.dumps(alert, indent=2)) + return + LOGGER.debug('Sending alert to outputs:\n%s', json.dumps(alert, indent=2)) rule_name = alert['metadata']['rule_name'] diff --git a/stream_alert_cli/test.py b/stream_alert_cli/test.py index e214b8a33..04785920d 100644 --- a/stream_alert_cli/test.py +++ b/stream_alert_cli/test.py @@ -328,7 +328,6 @@ def test_processor(self, alerts): Args: alerts [list]: list of alerts to be processed that have been fed in from the rule processor. - url_mock [mock.patch]: patch to mock out urlopen calls Return: [bool] boolean indicating the status of the alert processor dispatching @@ -337,15 +336,10 @@ def test_processor(self, alerts): # Set the logger level to info so its not too noisy StreamOutput.LOGGER.setLevel(logging.ERROR) for alert in alerts: - outputs = alert['metadata'].get('outputs', []) - self.setup_outputs(outputs, url_mock) - event = json.dumps(alert) - context = Mock() - context.invoked_function_arn = ( - 'arn:aws:lambda:us-east-1:0123456789012:' - 'function:streamalert_alert_processor:production') - context.function_name = 'test_streamalert_alert_processor' - for passed, output in StreamOutput.handler(event, context): + if self.context.mocked: + self.setup_outputs(alert) + + for passed, output in StreamOutput.handler(alert, self.context): status = status and passed service, descriptor = output.split(':') message = 'sending alert to \'{}\''.format(descriptor) diff --git a/test/unit/stream_alert_alert_processor/test_helpers.py b/test/unit/stream_alert_alert_processor/test_helpers.py new file mode 100644 index 000000000..35c169404 --- /dev/null +++ b/test/unit/stream_alert_alert_processor/test_helpers.py @@ -0,0 +1,121 @@ +''' +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +from copy import deepcopy + +from nose.tools import assert_true, assert_false, assert_is_instance + + +from stream_alert.alert_processor.helpers import ( + validate_alert +) + + +def test_alert_validate_root(): + """Alert Structure Validation""" + # Default valid alert to be copied/modified + valid_alert = { + 'record': { + 'name': 'testing testing', + 'action': 'no action required' + }, + 'metadata': { + 'log': 'osquery', + 'rule_name': 'valid_alert_test', + 'outputs': [ + 'slack:alerts' + ], + 'source': { + 'service': 'kinesis', + 'entity': 'org_prefix_stream_alert_kinesis' + }, + 'type': 'json', + 'rule_description': 'Sample rule description \n\nthat can be multiple lines.\n' + } + } + + # Test with a valid alert structure + assert_true(validate_alert(valid_alert)) + + # Root key validation + invalid_root_keys = deepcopy(valid_alert) + invalid_root_keys.pop('metadata') + + # Test with invalid root keys + assert_false(validate_alert(invalid_root_keys)) + + # Root value validation + invalid_root_values = deepcopy(valid_alert) + invalid_root_values['metadata'] = ['value'] + + # Test with invalid root values + assert_false(validate_alert(invalid_root_values)) + + # metadata key validation + invalid_metadata_keys = deepcopy(valid_alert) + invalid_metadata_keys['metadata'] = {'log': 'osquery'} + + # Test with invalid metadata keys + assert_false(validate_alert(invalid_metadata_keys)) + + # metadata > source key validation + invalid_metadata_source_01 = deepcopy(valid_alert) + invalid_metadata_source_01['metadata']['source'] = {'service': 'kinesis'} + + # Test with invalid metadata source keys + assert_false(validate_alert(invalid_metadata_source_01)) + + # metadata > source value validation + invalid_metadata_source_02 = deepcopy(valid_alert) + invalid_metadata_source_02['metadata']['source']['entity'] = 100 + + # Test with invalid metadata source values + assert_false(validate_alert(invalid_metadata_source_02)) + + # metadata > outputs type validation + invalid_metadata_outputs = deepcopy(valid_alert) + invalid_metadata_outputs['metadata']['outputs'] = {'bad': 'value'} + + # Test with invalid metadata outputs type + assert_false(validate_alert(invalid_metadata_outputs)) + + # metadata > outputs value validation + invalid_metadata_outputs_value = deepcopy(valid_alert) + invalid_metadata_outputs_value['metadata']['outputs'] = ['good', 100] + + # Test with invalid metadata outputs value + assert_false(validate_alert(invalid_metadata_outputs_value)) + + # metadata > non-string value validation + invalid_metadata_non_string = deepcopy(valid_alert) + invalid_metadata_non_string['metadata']['type'] = 4.5 + + # Test with invalid metadata non-string value + assert_false(validate_alert(invalid_metadata_non_string)) + + + + + + + + +# +# +# { +# 'default': { +# +# } +# } diff --git a/test/unit/stream_alert_alert_processor/test_outputs.py b/test/unit/stream_alert_alert_processor/test_outputs.py index eacdfa29f..3bc1fcb61 100644 --- a/test/unit/stream_alert_alert_processor/test_outputs.py +++ b/test/unit/stream_alert_alert_processor/test_outputs.py @@ -449,7 +449,7 @@ def _setup_dispatch(self): creds = {'url': 'https://api.slack.com/web-hook-key'} put_mock_creds(output_name, creds, self.__dispatcher.secrets_bucket, - REGION, KMS_ALIAS) + REGION, KMS_ALIAS) return _get_alert() @@ -605,7 +605,7 @@ def test_locals(self): def _setup_dispatch(self): """Helper for setting up LambdaOutput dispatch""" function_name = CONFIG[self.__service][self.__descriptor] - _create_lambda_function(function_name, REGION) + create_lambda_function(function_name, REGION) return _get_alert() @mock_lambda From bfd711124dbf889c6c02df21045ea310e703f011 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Wed, 12 Jul 2017 13:43:01 -0700 Subject: [PATCH 05/14] [tests] adding unit tests to cover new code --- stream_alert/alert_processor/main.py | 2 +- .../test_helpers.py | 37 +---- .../stream_alert_alert_processor/test_main.py | 146 +++++++++++++++--- 3 files changed, 126 insertions(+), 59 deletions(-) diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 87b86e865..2f0aa4452 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -161,7 +161,7 @@ def _load_output_config(config_path='conf/outputs.json'): try: config = json.load(outputs) except ValueError: - LOGGER.error('The conf/outputs.json file could not be loaded into json') + LOGGER.error('The \'%s\' file could not be loaded into json', config_path) return return config diff --git a/test/unit/stream_alert_alert_processor/test_helpers.py b/test/unit/stream_alert_alert_processor/test_helpers.py index 35c169404..d1bb16a79 100644 --- a/test/unit/stream_alert_alert_processor/test_helpers.py +++ b/test/unit/stream_alert_alert_processor/test_helpers.py @@ -17,34 +17,16 @@ from nose.tools import assert_true, assert_false, assert_is_instance - from stream_alert.alert_processor.helpers import ( validate_alert ) +from unit.stream_alert_alert_processor.helpers import _get_alert def test_alert_validate_root(): """Alert Structure Validation""" # Default valid alert to be copied/modified - valid_alert = { - 'record': { - 'name': 'testing testing', - 'action': 'no action required' - }, - 'metadata': { - 'log': 'osquery', - 'rule_name': 'valid_alert_test', - 'outputs': [ - 'slack:alerts' - ], - 'source': { - 'service': 'kinesis', - 'entity': 'org_prefix_stream_alert_kinesis' - }, - 'type': 'json', - 'rule_description': 'Sample rule description \n\nthat can be multiple lines.\n' - } - } + valid_alert = _get_alert() # Test with a valid alert structure assert_true(validate_alert(valid_alert)) @@ -104,18 +86,3 @@ def test_alert_validate_root(): # Test with invalid metadata non-string value assert_false(validate_alert(invalid_metadata_non_string)) - - - - - - - - -# -# -# { -# 'default': { -# -# } -# } diff --git a/test/unit/stream_alert_alert_processor/test_main.py b/test/unit/stream_alert_alert_processor/test_main.py index 37550837c..3ed7b6221 100644 --- a/test/unit/stream_alert_alert_processor/test_main.py +++ b/test/unit/stream_alert_alert_processor/test_main.py @@ -16,9 +16,14 @@ import json from collections import OrderedDict -from mock import patch, call +from mock import mock_open, patch -from nose.tools import assert_equal, assert_is_instance +from nose.tools import ( + assert_equal, + assert_is_instance, + assert_list_equal, + assert_true +) from stream_alert.alert_processor.main import ( _load_output_config, @@ -31,33 +36,32 @@ FUNCTION_NAME ) -from unit.stream_alert_alert_processor.helpers import _get_mock_context +from unit.stream_alert_alert_processor.helpers import ( + _get_alert, + _get_mock_context +) @patch('stream_alert.alert_processor.main.run') def test_handler_run(run_mock): """Main handler `run` call params""" context = _get_mock_context() - message = { - 'record': {'value': 'data'}, - 'metadata': { - 'rule_name': 'rule_name', - 'rule_description': 'rule_description', - 'log': 'log_source', - 'outputs': ['rule:output01', 'rule:output02'], - 'type': 'payload.type', - 'source': { - 'service': 'payload.service', - 'entity': 'payload.entity' - } - } - } - for _ in handler(message, context): - pass + handler(None, context) # This test will load the actual config, so we should compare the # function call against the same config here. - run_mock.assert_called_with(message, REGION, FUNCTION_NAME, _load_output_config()) + run_mock.assert_called_with(None, REGION, FUNCTION_NAME, _load_output_config()) + +@patch('logging.Logger.error') +def test_bad_config(log_mock): + """Load output config - bad config""" + mock = mock_open(read_data='non-json string that will log an error') + with patch('__builtin__.open', mock): + handler(None, None) + + log_mock.assert_called_with( + 'The \'%s\' file could not be loaded into json', + 'conf/outputs.json') def test_handler_return(): @@ -84,8 +88,104 @@ def test_sort_dict(): assert_equal(type(sorted_dict), OrderedDict) - index = 0 keys = ['a', 'b', 'c', 'd', 'e', 'f', 'g'] - for key in sorted_dict.keys(): + for index, key in enumerate(sorted_dict.keys()): assert_equal(keys[index], key) - index += 1 + + +def test_sort_dict_recursive(): + """Sorted Dict Recursion""" + dict_to_sort = {'c': 100, 'a': 1, 'b': {'b': 10, 'c': 1000, 'a': 1}} + sorted_dict = _sort_dict(dict_to_sort) + + assert_equal(type(sorted_dict), OrderedDict) + assert_equal(type(sorted_dict['b']), OrderedDict) + + sub_keys = ['a', 'b', 'c'] + for index, key in enumerate(sorted_dict['b'].keys()): + assert_equal(sub_keys[index], key) + + +@patch('urllib2.urlopen') +@patch('stream_alert.alert_processor.main._load_output_config') +@patch('stream_alert.alert_processor.output_base.StreamOutputBase._load_creds') +def test_running_success(creds_mock, config_mock, url_mock): + """Alert Processor run handler - success""" + config_mock.return_value = _load_output_config('test/unit/conf/outputs.json') + creds_mock.return_value = {'url': 'mock.url'} + url_mock.return_value.getcode.return_value = 200 + + alert = _get_alert() + context = _get_mock_context() + + result = handler(alert, context) + assert_is_instance(result, list) + + assert_true(result[0][0]) + + +@patch('logging.Logger.error') +@patch('stream_alert.alert_processor.main._load_output_config') +def test_running_bad_output(config_mock, log_mock): + """Alert Processor run handler - bad output""" + config_mock.return_value = _load_output_config('test/unit/conf/outputs.json') + + alert = _get_alert() + alert['metadata']['outputs'] = ['slack'] + context = _get_mock_context() + + handler(alert, context) + + log_mock.assert_called_with( + 'Improperly formatted output [%s]. Outputs for rules must ' + 'be declared with both a service and a descriptor for the ' + 'integration (ie: \'slack:my_channel\')', 'slack') + + alert['metadata']['outputs'] = ['slakc:test'] + + handler(alert, context) + + log_mock.assert_called_with( + 'The output \'%s\' does not exist!', 'slakc:test') + + + +@patch('stream_alert.alert_processor.main._load_output_config') +@patch('stream_alert.alert_processor.main.get_output_dispatcher') +def test_running_no_dispatcher(dispatch_mock, config_mock): + """Alert Processor run handler - no dispatcher""" + config_mock.return_value = _load_output_config('test/unit/conf/outputs.json') + dispatch_mock.return_value = None + + alert = _get_alert() + context = _get_mock_context() + + result = handler(alert, context) + + assert_is_instance(result, list) + assert_list_equal(result, []) + + +@patch('logging.Logger.exception') +@patch('urllib2.urlopen') +@patch('stream_alert.alert_processor.main._load_output_config') +@patch('stream_alert.alert_processor.main.get_output_dispatcher') +@patch('stream_alert.alert_processor.output_base.StreamOutputBase._load_creds') +def test_running_exception_occurred(creds_mock, dispatch_mock, config_mock, url_mock, log_mock): + """Alert Processor run handler - exception occurred""" + # Use TypeError as the mock's side_effect + err = TypeError('bad error') + creds_mock.return_value = {'url': 'mock.url'} + dispatch_mock.return_value.dispatch.side_effect = err + config_mock.return_value = _load_output_config('test/unit/conf/outputs.json') + url_mock.return_value.getcode.return_value = 200 + + alert = _sort_dict(_get_alert()) + context = _get_mock_context() + + handler(alert, context) + + log_mock.assert_called_with( + 'An error occurred while sending alert ' + 'to %s:%s: %s. alert:\n%s', 'slack', 'unit_test_channel', + err, json.dumps(alert, indent=2)) From 90f0739eb41e5e4bc06794a7b658d83f87389ae2 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Wed, 12 Jul 2017 14:25:19 -0700 Subject: [PATCH 06/14] [tf] stream_alert module changes for direct invocation of lambda functions --- terraform/modules/tf_stream_alert/iam.tf | 41 +++++++++-------- terraform/modules/tf_stream_alert/main.tf | 56 ++++++++++------------- terraform/modules/tf_stream_alert/s3.tf | 15 ++++++ terraform/modules/tf_stream_alert/sns.tf | 36 +++++---------- 4 files changed, 72 insertions(+), 76 deletions(-) create mode 100644 terraform/modules/tf_stream_alert/s3.tf diff --git a/terraform/modules/tf_stream_alert/iam.tf b/terraform/modules/tf_stream_alert/iam.tf index 948e2bfb4..803b7f34c 100644 --- a/terraform/modules/tf_stream_alert/iam.tf +++ b/terraform/modules/tf_stream_alert/iam.tf @@ -1,12 +1,11 @@ -/* -// Rule Processor Execution Role -*/ +// IAM Role: Rule Processor Execution Role resource "aws_iam_role" "streamalert_rule_processor_role" { name = "${var.prefix}_${var.cluster}_streamalert_rule_processor_role" assume_role_policy = "${data.aws_iam_policy_document.lambda_assume_role_policy.json}" } +// IAM Policy Doc: Generic Lambda AssumeRole data "aws_iam_policy_document" "lambda_assume_role_policy" { statement { effect = "Allow" @@ -19,39 +18,38 @@ data "aws_iam_policy_document" "lambda_assume_role_policy" { } } -// Policy: Allow the Rule Processor to send alerts to SNS -resource "aws_iam_role_policy" "streamalert_rule_processor_sns" { - name = "${var.prefix}_${var.cluster}_streamalert_rule_processor_send_to_sns" +// IAM Role Policy: Allow the Rule Processor to invoke the Alert Processor +resource "aws_iam_role_policy" "streamalert_rule_processor_lambda" { + name = "${var.prefix}_${var.cluster}_streamalert_rule_processor_invoke_alert_proc" role = "${aws_iam_role.streamalert_rule_processor_role.id}" policy = "${data.aws_iam_policy_document.rule_processor_sns.json}" } -data "aws_iam_policy_document" "rule_processor_sns" { +// IAM Policy Doc: Allow the Rule Processor to invoke the Alert Processor +data "aws_iam_policy_document" "rule_processor_invoke_alert_proc" { statement { effect = "Allow" actions = [ - "sns:Publish", - "sns:Subscribe", + "lambda:InvokeFunction", ] + # Use interpolation because of the different VPC/non vpc resources resources = [ - "${aws_sns_topic.streamalert.arn}", + "arn:aws:lambda:${var.region}:${var.account_id}:function:${var.prefix}_${var.cluster}_streamalert_alert_processor", ] } } -/* -// Alert Processor Execution Role -*/ +// IAM Role: Alert Processor Execution Role resource "aws_iam_role" "streamalert_alert_processor_role" { name = "${var.prefix}_${var.cluster}_streamalert_alert_processor_role" assume_role_policy = "${data.aws_iam_policy_document.lambda_assume_role_policy.json}" } -// Policy: Allow the Alert Processor to decrypt secrets +// IAM Role Policy: Allow the Alert Processor to decrypt secrets resource "aws_iam_role_policy" "streamalert_alert_processor_kms" { name = "${var.prefix}_${var.cluster}_streamalert_alert_processor_kms" role = "${aws_iam_role.streamalert_alert_processor_role.id}" @@ -59,6 +57,7 @@ resource "aws_iam_role_policy" "streamalert_alert_processor_kms" { policy = "${data.aws_iam_policy_document.rule_processor_kms_decrypt.json}" } +// IAM Policy Doc: KMS key permissions for decryption data "aws_iam_policy_document" "rule_processor_kms_decrypt" { statement { effect = "Allow" @@ -74,8 +73,8 @@ data "aws_iam_policy_document" "rule_processor_kms_decrypt" { } } -// Policy: Allow the Alert Processor to write objects to S3. -// The default S3 bucket is also created by this module. +// IAM Role Policy: Allow the Alert Processor to write objects to S3. +// The default S3 bucket is also created by this module. resource "aws_iam_role_policy" "streamalert_alert_processor_s3" { name = "${var.prefix}_${var.cluster}_streamalert_alert_processor_s3_default" role = "${aws_iam_role.streamalert_alert_processor_role.id}" @@ -83,6 +82,7 @@ resource "aws_iam_role_policy" "streamalert_alert_processor_s3" { policy = "${data.aws_iam_policy_document.alert_processor_s3.json}" } +// IAM Policy Doc: Allow fetching of secrets and putting of alerts data "aws_iam_policy_document" "alert_processor_s3" { statement { effect = "Allow" @@ -111,7 +111,7 @@ data "aws_iam_policy_document" "alert_processor_s3" { } } -// Policy: Allow the Alert Processor to write cloudwatch logs +// IAM Role Policy: Allow the Alert Processor to write CloudWatch logs resource "aws_iam_role_policy" "streamalert_alert_processor_cloudwatch" { name = "${var.prefix}_${var.cluster}_streamalert_alert_processor_cloudwatch" role = "${aws_iam_role.streamalert_alert_processor_role.id}" @@ -119,6 +119,7 @@ resource "aws_iam_role_policy" "streamalert_alert_processor_cloudwatch" { policy = "${data.aws_iam_policy_document.alert_processor_cloudwatch.json}" } +// IAM Policy Doc: Allow creating log groups and events in any CloudWatch stream data "aws_iam_policy_document" "alert_processor_cloudwatch" { statement { effect = "Allow" @@ -135,7 +136,7 @@ data "aws_iam_policy_document" "alert_processor_cloudwatch" { } } -// Policy: Allow the Alert Processor to invoke Lambda functions +// IAM Role Policy: Allow the Alert Processor to invoke configured Lambda functions resource "aws_iam_role_policy" "streamalert_alert_processor_lambda" { count = "${length(var.output_lambda_functions)}" name = "${var.prefix}_${var.cluster}_streamalert_alert_processor_lambda_${count.index}" @@ -158,7 +159,7 @@ resource "aws_iam_role_policy" "streamalert_alert_processor_lambda" { EOF } -// Policy: Allow the Alert Processor to send to arbitrary S3 buckets as outputs +// IAM Role Policy: Allow the Alert Processor to send to arbitrary S3 buckets as outputs resource "aws_iam_role_policy" "streamalert_alert_processor_s3_outputs" { count = "${length(var.output_s3_buckets)}" name = "${var.prefix}_${var.cluster}_streamalert_alert_processor_s3_output_${count.index}" @@ -182,7 +183,7 @@ resource "aws_iam_role_policy" "streamalert_alert_processor_s3_outputs" { EOF } -// Policy: Allow the Alert Processor to run in a VPC +// IAM Role Policy: Allow the Alert Processor to run in a VPC resource "aws_iam_role_policy" "streamalert_alert_processor_vpc" { count = "${var.alert_processor_vpc_enabled ? 1 : 0}" name = "${var.prefix}_${var.cluster}_streamalert_alert_processor_vpc" diff --git a/terraform/modules/tf_stream_alert/main.tf b/terraform/modules/tf_stream_alert/main.tf index 2b9c3b4e0..0f5cbbe23 100644 --- a/terraform/modules/tf_stream_alert/main.tf +++ b/terraform/modules/tf_stream_alert/main.tf @@ -1,5 +1,5 @@ -// AWS Lambda Function: StreamAlert Processor -// Matches rules against logs from Kinesis Streams or S3 +// Lambda Function: Rule Processor +// Matches rules against logs from Kinesis, S3, or SNS resource "aws_lambda_function" "streamalert_rule_processor" { function_name = "${var.prefix}_${var.cluster}_streamalert_rule_processor" description = "StreamAlert Rule Processor" @@ -16,7 +16,7 @@ resource "aws_lambda_function" "streamalert_rule_processor" { } } -// StreamAlert Processor Production Alias +// Lambda Alias: Rule Processor Production resource "aws_lambda_alias" "rule_processor_production" { name = "production" description = "Production StreamAlert Rule Processor Alias" @@ -24,21 +24,9 @@ resource "aws_lambda_alias" "rule_processor_production" { function_version = "${var.rule_processor_version}" } -// Allow SNS to invoke the StreamAlert Output Processor -resource "aws_lambda_permission" "sns_inputs" { - count = "${length(var.input_sns_topics)}" - statement_id = "AllowExecutionFromSNS${count.index}" - action = "lambda:InvokeFunction" - function_name = "${aws_lambda_function.streamalert_rule_processor.arn}" - principal = "sns.amazonaws.com" - source_arn = "${element(var.input_sns_topics, count.index)}" - qualifier = "production" -} - -// AWS Lambda Function: StreamAlert Alert Processor +// Lambda Function: Alert Processor // Send alerts to declared outputs - -// Lambda Function inside a VPC +// VPC resource "aws_lambda_function" "streamalert_alert_processor_vpc" { count = "${var.alert_processor_vpc_enabled ? 1 : 0}" function_name = "${var.prefix}_${var.cluster}_streamalert_alert_processor" @@ -61,7 +49,9 @@ resource "aws_lambda_function" "streamalert_alert_processor_vpc" { } } -// Non VPC Lambda Function +// Lambda Function: Alert Processor +// Send alerts to declared outputs +// Non VPC resource "aws_lambda_function" "streamalert_alert_processor" { count = "${var.alert_processor_vpc_enabled ? 0 : 1}" function_name = "${var.prefix}_${var.cluster}_streamalert_alert_processor" @@ -79,8 +69,8 @@ resource "aws_lambda_function" "streamalert_alert_processor" { } } -// StreamAlert Output Processor Production Alias -// VPC +// Lambda Alias: Alert Processor Production +// VPC resource "aws_lambda_alias" "alert_processor_production_vpc" { count = "${var.alert_processor_vpc_enabled ? 1 : 0}" name = "production" @@ -89,7 +79,8 @@ resource "aws_lambda_alias" "alert_processor_production_vpc" { function_version = "${var.alert_processor_version}" } -// Non VPC +// Lambda Alias: Alert Processor Production +// Non VPC resource "aws_lambda_alias" "alert_processor_production" { count = "${var.alert_processor_vpc_enabled ? 0 : 1}" name = "production" @@ -98,27 +89,28 @@ resource "aws_lambda_alias" "alert_processor_production" { function_version = "${var.alert_processor_version}" } -// Allow SNS to invoke the Alert Processor -// VPC -resource "aws_lambda_permission" "with_sns_vpc" { +// Lambda Permission: Allow SNS to invoke the Alert Processor +// VPC +resource "aws_lambda_permission" "rule_processor_vpc" { count = "${var.alert_processor_vpc_enabled ? 1 : 0}" - statement_id = "AllowExecutionFromSNS" + statement_id = "AllowExecutionFromLambda" action = "lambda:InvokeFunction" function_name = "${aws_lambda_function.streamalert_alert_processor_vpc.arn}" - principal = "sns.amazonaws.com" - source_arn = "${aws_sns_topic.streamalert.arn}" + principal = "lambda.amazonaws.com" + source_arn = "${aws_lambda_function.streamalert_rule_processor.arn}" qualifier = "production" depends_on = ["aws_lambda_alias.alert_processor_production_vpc"] } -// Non VPC -resource "aws_lambda_permission" "with_sns" { +// Lambda Permission: Allow SNS to invoke the Alert Processor +// Non VPC +resource "aws_lambda_permission" "rule_processor" { count = "${var.alert_processor_vpc_enabled ? 0 : 1}" - statement_id = "AllowExecutionFromSNS" + statement_id = "AllowExecutionFromLambda" action = "lambda:InvokeFunction" function_name = "${aws_lambda_function.streamalert_alert_processor.arn}" - principal = "sns.amazonaws.com" - source_arn = "${aws_sns_topic.streamalert.arn}" + principal = "lambda.amazonaws.com" + source_arn = "${aws_lambda_function.streamalert_rule_processor.arn}" qualifier = "production" depends_on = ["aws_lambda_alias.alert_processor_production"] } diff --git a/terraform/modules/tf_stream_alert/s3.tf b/terraform/modules/tf_stream_alert/s3.tf new file mode 100644 index 000000000..2b5965cdb --- /dev/null +++ b/terraform/modules/tf_stream_alert/s3.tf @@ -0,0 +1,15 @@ +// S3 Bucket: Store StreamAlerts from the Alert Processor +resource "aws_s3_bucket" "streamalerts" { + bucket = "${replace("${var.prefix}.${var.cluster}.streamalerts", "_", ".")}" + acl = "private" + force_destroy = false + + versioning { + enabled = true + } + + logging { + target_bucket = "${var.s3_logging_bucket}" + target_prefix = "${replace("${var.prefix}.${var.cluster}.streamalerts", "_", ".")}/" + } +} diff --git a/terraform/modules/tf_stream_alert/sns.tf b/terraform/modules/tf_stream_alert/sns.tf index 4f41b1f96..703724efe 100644 --- a/terraform/modules/tf_stream_alert/sns.tf +++ b/terraform/modules/tf_stream_alert/sns.tf @@ -1,30 +1,18 @@ -// SNS Topic to emit alerts to -resource "aws_sns_topic" "streamalert" { - name = "${var.prefix}_${var.cluster}_streamalerts" - display_name = "${var.prefix}_${var.cluster}_streamalerts" -} - -// Subscribe the Alert Processor Lambda function to the SNS topic -// VPC -resource "aws_sns_topic_subscription" "alert_processor_vpc" { - count = "${var.alert_processor_vpc_enabled ? 1 : 0}" - topic_arn = "${aws_sns_topic.streamalert.arn}" - endpoint = "${aws_lambda_function.streamalert_alert_processor_vpc.arn}:production" - protocol = "lambda" -} - -// Non VPC -resource "aws_sns_topic_subscription" "alert_processor" { - count = "${var.alert_processor_vpc_enabled ? 0 : 1}" - topic_arn = "${aws_sns_topic.streamalert.arn}" - endpoint = "${aws_lambda_function.streamalert_alert_processor.arn}:production" - protocol = "lambda" -} - -// Subscribe the Rule Processor Lambda function to arbitrary SNS topics +// SNS Topic Subscription: Subscribe the Rule Processor to configured SNS topics resource "aws_sns_topic_subscription" "input_topic_subscriptions" { count = "${length(var.input_sns_topics)}" topic_arn = "${element(var.input_sns_topics, count.index)}" endpoint = "${aws_lambda_function.streamalert_rule_processor.arn}:production" protocol = "lambda" } + +// Lambda Permission: Allow SNS to invoke the Rule Processor +resource "aws_lambda_permission" "sns_inputs" { + count = "${length(var.input_sns_topics)}" + statement_id = "AllowExecutionFromSNS${count.index}" + action = "lambda:InvokeFunction" + function_name = "${aws_lambda_function.streamalert_rule_processor.arn}" + principal = "sns.amazonaws.com" + source_arn = "${element(var.input_sns_topics, count.index)}" + qualifier = "production" +} From 252b5eab5a05f9f258b0f313a4bfc953691ef446 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Thu, 13 Jul 2017 09:16:20 -0700 Subject: [PATCH 07/14] [modules] lower the threshold on iterator age alarm --- terraform/modules/tf_stream_alert_monitoring/main.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/modules/tf_stream_alert_monitoring/main.tf b/terraform/modules/tf_stream_alert_monitoring/main.tf index a698e4bdf..d259d7fdb 100644 --- a/terraform/modules/tf_stream_alert_monitoring/main.tf +++ b/terraform/modules/tf_stream_alert_monitoring/main.tf @@ -49,7 +49,7 @@ resource "aws_cloudwatch_metric_alarm" "streamalert_lambda_iterator_age" { metric_name = "IteratorAge" statistic = "Maximum" comparison_operator = "GreaterThanThreshold" - threshold = "43000000" + threshold = "1000000" evaluation_periods = "1" period = "300" alarm_description = "StreamAlert Lambda High Iterator Age: ${element(var.lambda_functions, count.index)}" From dd790937f0d27769dbcef47e9f96a6582616a06d Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Thu, 13 Jul 2017 09:16:42 -0700 Subject: [PATCH 08/14] [tf] SNS topic management for cloudwatch alarms * Optionally create an SNS topic for receiving metric alarms * Optionall accept an existing SNS topic to receive metric alarms * Update documentation on usage * Unit tests --- conf/global.json | 5 +++ docs/source/clusters.rst | 30 ++++++++++++++++ stream_alert_cli/terraform_generate.py | 27 +++++++++++++- .../test_terraform_generate.py | 36 ++++++++++++++++++- 4 files changed, 96 insertions(+), 2 deletions(-) diff --git a/conf/global.json b/conf/global.json index 530dd24f3..3b63a957a 100644 --- a/conf/global.json +++ b/conf/global.json @@ -9,5 +9,10 @@ "tfstate_bucket": "PREFIX_GOES_HERE.streamalert.terraform.state", "tfstate_s3_key": "stream_alert_state/terraform.tfstate", "tfvars": "terraform.tfvars" + }, + "infrastructure": { + "monitoring": { + "create_sns_topic": true + } } } \ No newline at end of file diff --git a/docs/source/clusters.rst b/docs/source/clusters.rst index fe2eab46d..5a77ee36c 100644 --- a/docs/source/clusters.rst +++ b/docs/source/clusters.rst @@ -130,6 +130,36 @@ To disable CloudWatch alarms, set to ``false``. } } +To configure the SNS topic used to receive CloudWatch metric alarms, use one of the following options in the ``conf/global.json`` configuration file. + +Option 1: Create a new topic. This tells the StreamAlert CLI to create a new topic called ``stream_alert_monitoring``. All clusters will send alarms to this topic. + +.. code-block:: json + + { + "account": {...}, + "terraform": {...} + "infrastructure": { + "monitoring": { + "create_sns_topic": true + } + } + } + +Option 2: Use an existing SNS topic within your AWS account (created outside of the scope of StreamAlert). + +.. code-block:: json + + { + "account": {...}, + "terraform": {...} + "infrastructure": { + "monitoring": { + "sns_topic_name": "my_sns_topic" + } + } + } + Module: Kinesis Events ---------------------- diff --git a/stream_alert_cli/terraform_generate.py b/stream_alert_cli/terraform_generate.py index 12364db31..9cedbee40 100644 --- a/stream_alert_cli/terraform_generate.py +++ b/stream_alert_cli/terraform_generate.py @@ -155,6 +155,13 @@ def generate_main(**kwargs): 'target_key_id': '${aws_kms_key.stream_alert_secrets.key_id}' } + infrastructure_config = config['global'].get('infrastructure') + if infrastructure_config and 'monitoring' in infrastructure_config: + if infrastructure_config['monitoring'].get('create_sns_topic'): + main_dict['resource']['aws_sns_topic']['stream_alert_monitoring'] = { + 'name': 'stream_alert_monitoring' + } + return main_dict @@ -277,9 +284,27 @@ def generate_cloudwatch_monitoring(cluster_name, cluster_dict, config): [bool] Result of applying the cloudwatch_monitoring module """ prefix = config['global']['account']['prefix'] + infrastructure_config = config['global'].get('infrastructure') + if infrastructure_config and 'monitoring' in infrastructure_config: + if infrastructure_config['monitoring'].get('create_sns_topic'): + sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( + region=config['global']['account']['region'], + account_id=config['global']['account']['aws_account_id'], + topic='stream_alert_monitoring' + ) + elif infrastructure_config['monitoring'].get('sns_topic_name'): + sns_topic_arn = 'arn:aws:sns:{region}:{account_id}:{topic}'.format( + region=config['global']['account']['region'], + account_id=config['global']['account']['aws_account_id'], + topic=infrastructure_config['monitoring']['sns_topic_name'] + ) + else: + LOGGER_CLI.error('Invalid config: Make sure you declare global infrastructure options!') + return False + cluster_dict['module']['cloudwatch_monitoring_{}'.format(cluster_name)] = { 'source': 'modules/tf_stream_alert_monitoring', - 'sns_topic_arn': '${{module.stream_alert_{}.sns_topic_arn}}'.format(cluster_name), + 'sns_topic_arn': sns_topic_arn, 'lambda_functions': [ '{}_{}_streamalert_rule_processor'.format(prefix, cluster_name), '{}_{}_streamalert_alert_processor'.format(prefix, cluster_name) diff --git a/test/unit/stream_alert_cli/test_terraform_generate.py b/test/unit/stream_alert_cli/test_terraform_generate.py index 3f807b3b0..0f517d716 100644 --- a/test/unit/stream_alert_cli/test_terraform_generate.py +++ b/test/unit/stream_alert_cli/test_terraform_generate.py @@ -39,6 +39,11 @@ def setup(self): }, 'terraform': { 'tfstate_bucket': 'unit-testing.terraform.tfstate' + }, + 'infrastructure': { + 'monitoring': { + 'create_sns_topic': True + } } }, 'lambda': { @@ -312,6 +317,11 @@ def test_generate_main(self): 'target_prefix': 'unit-testing.streamalerts/' } } + }, + 'aws_sns_topic': { + 'stream_alert_monitoring': { + 'name': 'stream_alert_monitoring' + } } } } @@ -474,9 +484,10 @@ def test_generate_cloudwatch_monitoring(self): self.config ) + # Test a the default SNS topic option expected_cloudwatch_tf = { 'source': 'modules/tf_stream_alert_monitoring', - 'sns_topic_arn': '${module.stream_alert_test.sns_topic_arn}', + 'sns_topic_arn': 'arn:aws:sns:us-west-1:12345678910:stream_alert_monitoring', 'lambda_functions': [ 'unit-testing_test_streamalert_rule_processor', 'unit-testing_test_streamalert_alert_processor' @@ -488,6 +499,29 @@ def test_generate_cloudwatch_monitoring(self): self.cluster_dict['module']['cloudwatch_monitoring_test'], expected_cloudwatch_tf) + # Test a pre-defined SNS topic + self.config['global']['infrastructure']['monitoring']['create_sns_topic'] = False + self.config['global']['infrastructure']['monitoring']['sns_topic_name'] = 'unit_test_monitoring' + terraform_generate.generate_cloudwatch_monitoring( + cluster_name, + self.cluster_dict, + self.config + ) + + expected_cloudwatch_tf_custom = { + 'source': 'modules/tf_stream_alert_monitoring', + 'sns_topic_arn': 'arn:aws:sns:us-west-1:12345678910:unit_test_monitoring', + 'lambda_functions': [ + 'unit-testing_test_streamalert_rule_processor', + 'unit-testing_test_streamalert_alert_processor' + ], + 'kinesis_stream': 'unit-testing_test_stream_alert_kinesis' + } + + assert_equal( + self.cluster_dict['module']['cloudwatch_monitoring_test'], + expected_cloudwatch_tf_custom) + def test_generate_cluster_test(self): """CLI - Terraform Generate 'Test' Cluster""" From d71981aee0d2fb298c15be79f7b7ee33e035a79c Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 13 Jul 2017 18:46:52 -0700 Subject: [PATCH 09/14] [tf] fixing a few terraform bugs --- terraform/modules/tf_stream_alert/iam.tf | 2 +- terraform/modules/tf_stream_alert/main.tf | 4 ++-- terraform/modules/tf_stream_alert/output.tf | 4 ---- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/terraform/modules/tf_stream_alert/iam.tf b/terraform/modules/tf_stream_alert/iam.tf index 803b7f34c..4a0001191 100644 --- a/terraform/modules/tf_stream_alert/iam.tf +++ b/terraform/modules/tf_stream_alert/iam.tf @@ -23,7 +23,7 @@ resource "aws_iam_role_policy" "streamalert_rule_processor_lambda" { name = "${var.prefix}_${var.cluster}_streamalert_rule_processor_invoke_alert_proc" role = "${aws_iam_role.streamalert_rule_processor_role.id}" - policy = "${data.aws_iam_policy_document.rule_processor_sns.json}" + policy = "${data.aws_iam_policy_document.rule_processor_invoke_alert_proc.json}" } // IAM Policy Doc: Allow the Rule Processor to invoke the Alert Processor diff --git a/terraform/modules/tf_stream_alert/main.tf b/terraform/modules/tf_stream_alert/main.tf index 0f5cbbe23..5ab96bdcb 100644 --- a/terraform/modules/tf_stream_alert/main.tf +++ b/terraform/modules/tf_stream_alert/main.tf @@ -89,7 +89,7 @@ resource "aws_lambda_alias" "alert_processor_production" { function_version = "${var.alert_processor_version}" } -// Lambda Permission: Allow SNS to invoke the Alert Processor +// Lambda Permission: Allow Lambda to invoke the Alert Processor // VPC resource "aws_lambda_permission" "rule_processor_vpc" { count = "${var.alert_processor_vpc_enabled ? 1 : 0}" @@ -102,7 +102,7 @@ resource "aws_lambda_permission" "rule_processor_vpc" { depends_on = ["aws_lambda_alias.alert_processor_production_vpc"] } -// Lambda Permission: Allow SNS to invoke the Alert Processor +// Lambda Permission: Allow Lambda to invoke the Alert Processor // Non VPC resource "aws_lambda_permission" "rule_processor" { count = "${var.alert_processor_vpc_enabled ? 0 : 1}" diff --git a/terraform/modules/tf_stream_alert/output.tf b/terraform/modules/tf_stream_alert/output.tf index e3827d296..1ea2fb454 100644 --- a/terraform/modules/tf_stream_alert/output.tf +++ b/terraform/modules/tf_stream_alert/output.tf @@ -9,7 +9,3 @@ output "lambda_role_arn" { output "lambda_role_id" { value = "${aws_iam_role.streamalert_rule_processor_role.id}" } - -output "sns_topic_arn" { - value = "${aws_sns_topic.streamalert.arn}" -} From b48c3a6d838f68e059bd7e8fbdfb78d0e7e15a71 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 13 Jul 2017 18:47:49 -0700 Subject: [PATCH 10/14] [lambda][alert] fixing bug in alert processor that caused alerts to always go to $LATEST lambda --- stream_alert/rule_processor/sink.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/stream_alert/rule_processor/sink.py b/stream_alert/rule_processor/sink.py index e2ea416ab..18c7b4364 100644 --- a/stream_alert/rule_processor/sink.py +++ b/stream_alert/rule_processor/sink.py @@ -78,12 +78,13 @@ def sink(self, alerts): response = self.client_lambda.invoke( FunctionName=self.function, InvocationType='Event', - Payload=data + Payload=data, + Qualifier='production' ) except ClientError as err: LOGGER.exception('An error occurred while sending alert to ' - '\'%s\'. Error is: %s. Alert: %s', + '\'%s:production\'. Error is: %s. Alert: %s', self.function, err.response, data) From 34853502dc286f165fa685af58d915eb1a13d33b Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Thu, 13 Jul 2017 19:18:27 -0700 Subject: [PATCH 11/14] [docs] fixing error in json blocks that failed build --- docs/source/clusters.rst | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/docs/source/clusters.rst b/docs/source/clusters.rst index 5a77ee36c..bd58d106d 100644 --- a/docs/source/clusters.rst +++ b/docs/source/clusters.rst @@ -135,24 +135,32 @@ To configure the SNS topic used to receive CloudWatch metric alarms, use one of Option 1: Create a new topic. This tells the StreamAlert CLI to create a new topic called ``stream_alert_monitoring``. All clusters will send alarms to this topic. .. code-block:: json - + { - "account": {...}, - "terraform": {...} + "account": { + "...": "..." + }, + "terraform": { + "...": "..." + }, "infrastructure": { "monitoring": { "create_sns_topic": true } } } - + Option 2: Use an existing SNS topic within your AWS account (created outside of the scope of StreamAlert). .. code-block:: json - + { - "account": {...}, - "terraform": {...} + "account": { + "...": "..." + }, + "terraform": { + "...": "..." + }, "infrastructure": { "monitoring": { "sns_topic_name": "my_sns_topic" From 21cb539a69fdd8e398620b9026fb0667e444de5d Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Mon, 17 Jul 2017 13:14:40 -0700 Subject: [PATCH 12/14] [pr] feedback from @jacknagz & @austinbyers --- stream_alert/alert_processor/helpers.py | 11 +-- stream_alert/alert_processor/main.py | 10 +-- .../test_helpers.py | 84 ++++++++++++++----- 3 files changed, 70 insertions(+), 35 deletions(-) diff --git a/stream_alert/alert_processor/helpers.py b/stream_alert/alert_processor/helpers.py index 69ccc1234..066e591cb 100644 --- a/stream_alert/alert_processor/helpers.py +++ b/stream_alert/alert_processor/helpers.py @@ -34,7 +34,7 @@ def validate_alert(alert): metadata_keys = {'log', 'rule_name', 'rule_description', 'type', 'source', 'outputs'} if not set(alert['metadata'].keys()) == metadata_keys: LOGGER.error('The value of the \'metadata\' key must be a map (dict) ' - 'that contains the following keys: %s', + 'that contains only the following keys: %s', ', '.join('\'{}\''.format(key) for key in metadata_keys)) return False @@ -44,7 +44,7 @@ def validate_alert(alert): if not (isinstance(alert['metadata'][key], dict) and set(alert['metadata'][key].keys()) == {'service', 'entity'}): LOGGER.error('The value of the \'source\' key must be a map (dict) that ' - 'contains \'service\' and \'entity\' keys.') + 'contains only \'service\' and \'entity\' keys.') valid = False continue @@ -53,7 +53,6 @@ def validate_alert(alert): LOGGER.error('The value of the \'%s\' key within \'%s\' must be ' 'a string (str).', entry, key) valid = False - continue elif key == 'outputs': if not (isinstance(alert['metadata'][key], list) and @@ -69,13 +68,11 @@ def validate_alert(alert): LOGGER.error('The value of each entry in the \'outputs\' list ' 'must be a string (str).') valid = False - continue elif not isinstance(alert['metadata'][key], (str, unicode)): LOGGER.error('The value of the \'%s\' key must be a string (str), not %s', key, type(alert['metadata'][key])) valid = False - continue return valid @@ -91,8 +88,8 @@ def _validate_root(alert): """ if not (isinstance(alert, dict) and set(alert.keys()) == {'record', 'metadata'}): - LOGGER.error('The alert must be a map (dict) that contains \'record\' ' - 'and \'metadata\' keys.') + LOGGER.error('The alert must be a map (dict) that contains only ' + '\'record\' and \'metadata\' keys.') return False if not (isinstance(alert['record'], dict) and diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 2f0aa4452..62a267ef4 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -52,7 +52,7 @@ def handler(event, context): function_name = context.function_name # Return the current list of statuses back to the caller - return [status for status in run(event, region, function_name, config)] + return list(status for status in run(event, region, function_name, config)) def run(alert, region, function_name, config): """Send an Alert to its described outputs. @@ -76,12 +76,12 @@ def run(alert, region, function_name, config): } } - region [string]: the AWS region being used - function_name [string]: the name of the lambda function - config [dict]: the loaded configuration for outputs from conf/outputs.json + region [string]: The AWS region of the currently executing Lambda function + function_name [string]: The name of the lambda function + config [dict]: The loaded configuration for outputs from conf/outputs.json Returns: - [generator] yields back dispatch status and name of the output to the handler + [generator] Yields back dispatch status and name of the output to the handler """ if not validate_alert(alert): LOGGER.error('Invalid alert:\n%s', json.dumps(alert, indent=2)) diff --git a/test/unit/stream_alert_alert_processor/test_helpers.py b/test/unit/stream_alert_alert_processor/test_helpers.py index d1bb16a79..7fde38251 100644 --- a/test/unit/stream_alert_alert_processor/test_helpers.py +++ b/test/unit/stream_alert_alert_processor/test_helpers.py @@ -13,9 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. ''' -from copy import deepcopy - -from nose.tools import assert_true, assert_false, assert_is_instance +from nose.tools import assert_true, assert_false from stream_alert.alert_processor.helpers import ( validate_alert @@ -23,65 +21,105 @@ from unit.stream_alert_alert_processor.helpers import _get_alert -def test_alert_validate_root(): - """Alert Structure Validation""" - # Default valid alert to be copied/modified +def test_valid_alert(): + """Alert Processor Input Validation - Valid Alert Structure""" + # Default valid alert to test valid_alert = _get_alert() # Test with a valid alert structure assert_true(validate_alert(valid_alert)) - # Root key validation - invalid_root_keys = deepcopy(valid_alert) + +def test_root_keys(): + """Alert Processor Input Validation - Invalid Root Keys""" + # Default valid alert to be modified + invalid_root_keys = _get_alert() + + # Remove 'metadata' key to break root key validation invalid_root_keys.pop('metadata') # Test with invalid root keys assert_false(validate_alert(invalid_root_keys)) - # Root value validation - invalid_root_values = deepcopy(valid_alert) + +def test_metadata_value(): + """Alert Processor Input Validation - Invalid Root Metadata Value""" + # Default valid alert to be modified + invalid_root_values = _get_alert() + + # Make the 'metadata' key's value a list to break root value validation invalid_root_values['metadata'] = ['value'] # Test with invalid root values assert_false(validate_alert(invalid_root_values)) - # metadata key validation - invalid_metadata_keys = deepcopy(valid_alert) + +def test_metadata_keys(): + """Alert Processor Input Validation - Metadata Keys Missing""" + # Default valid alert to be modified + invalid_metadata_keys = _get_alert() + + # Alter 'metadata' keys to break validation (not all required keys) invalid_metadata_keys['metadata'] = {'log': 'osquery'} # Test with invalid metadata keys assert_false(validate_alert(invalid_metadata_keys)) + +def test_metadata_source_keys(): + """Alert Processor Input Validation - Source Keys Missing""" + # Default valid alert to be modified + invalid_metadata_source = _get_alert() + # metadata > source key validation - invalid_metadata_source_01 = deepcopy(valid_alert) - invalid_metadata_source_01['metadata']['source'] = {'service': 'kinesis'} + invalid_metadata_source['metadata']['source'] = {'service': 'kinesis'} # Test with invalid metadata source keys - assert_false(validate_alert(invalid_metadata_source_01)) + assert_false(validate_alert(invalid_metadata_source)) + + +def test_metadata_source_value(): + """Alert Processor Input Validation - Source Entity Value""" + # Default valid alert to be modified + invalid_metadata_source = _get_alert() # metadata > source value validation - invalid_metadata_source_02 = deepcopy(valid_alert) - invalid_metadata_source_02['metadata']['source']['entity'] = 100 + invalid_metadata_source['metadata']['source']['entity'] = 100 # Test with invalid metadata source values - assert_false(validate_alert(invalid_metadata_source_02)) + assert_false(validate_alert(invalid_metadata_source)) + + +def test_outputs_type(): + """Alert Processor Input Validation - Metadata Outputs Bad Type""" + # Default valid alert to be modified + invalid_metadata_outputs = _get_alert() # metadata > outputs type validation - invalid_metadata_outputs = deepcopy(valid_alert) invalid_metadata_outputs['metadata']['outputs'] = {'bad': 'value'} # Test with invalid metadata outputs type assert_false(validate_alert(invalid_metadata_outputs)) + +def test_outputs_value_type(): + """Alert Processor Input Validation - Metadata Outputs Bad Value Type""" + # Default valid alert to be modified + invalid_metadata_outputs = _get_alert() + # metadata > outputs value validation - invalid_metadata_outputs_value = deepcopy(valid_alert) - invalid_metadata_outputs_value['metadata']['outputs'] = ['good', 100] + invalid_metadata_outputs['metadata']['outputs'] = ['good', 100] # Test with invalid metadata outputs value - assert_false(validate_alert(invalid_metadata_outputs_value)) + assert_false(validate_alert(invalid_metadata_outputs)) + + +def test_metadata_non_string_type(): + """Alert Processor Input Validation - Metadata Non-String""" + # Default valid alert to be modified + invalid_metadata_non_string = _get_alert() # metadata > non-string value validation - invalid_metadata_non_string = deepcopy(valid_alert) invalid_metadata_non_string['metadata']['type'] = 4.5 # Test with invalid metadata non-string value From 5c54db5d935b72b7d88653948eebb4a6ed9dc617 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Mon, 17 Jul 2017 14:15:51 -0700 Subject: [PATCH 13/14] [pr] second round of feedback --- stream_alert/alert_processor/main.py | 2 +- terraform/modules/tf_stream_alert/s3.tf | 15 --------------- test/unit/stream_alert_cli/test_outputs.py | 2 +- .../unit/stream_alert_rule_processor/test_sink.py | 13 +++++++------ 4 files changed, 9 insertions(+), 23 deletions(-) delete mode 100644 terraform/modules/tf_stream_alert/s3.tf diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 62a267ef4..ce44827b0 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -52,7 +52,7 @@ def handler(event, context): function_name = context.function_name # Return the current list of statuses back to the caller - return list(status for status in run(event, region, function_name, config)) + return list(run(event, region, function_name, config)) def run(alert, region, function_name, config): """Send an Alert to its described outputs. diff --git a/terraform/modules/tf_stream_alert/s3.tf b/terraform/modules/tf_stream_alert/s3.tf deleted file mode 100644 index 2b5965cdb..000000000 --- a/terraform/modules/tf_stream_alert/s3.tf +++ /dev/null @@ -1,15 +0,0 @@ -// S3 Bucket: Store StreamAlerts from the Alert Processor -resource "aws_s3_bucket" "streamalerts" { - bucket = "${replace("${var.prefix}.${var.cluster}.streamalerts", "_", ".")}" - acl = "private" - force_destroy = false - - versioning { - enabled = true - } - - logging { - target_bucket = "${var.s3_logging_bucket}" - target_prefix = "${replace("${var.prefix}.${var.cluster}.streamalerts", "_", ".")}/" - } -} diff --git a/test/unit/stream_alert_cli/test_outputs.py b/test/unit/stream_alert_cli/test_outputs.py index 8a30134d6..be4da1bb1 100644 --- a/test/unit/stream_alert_cli/test_outputs.py +++ b/test/unit/stream_alert_cli/test_outputs.py @@ -34,7 +34,7 @@ def test_load_output_config(): """Load outputs configuration""" - config = load_outputs_config() + config = load_outputs_config('test/unit/conf') loaded_config_keys = sorted(config.keys()) expected_config_keys = [u'aws-lambda', u'aws-s3', u'pagerduty', u'phantom', u'slack'] diff --git a/test/unit/stream_alert_rule_processor/test_sink.py b/test/unit/stream_alert_rule_processor/test_sink.py index 996c21fb2..7b1f3c47b 100644 --- a/test/unit/stream_alert_rule_processor/test_sink.py +++ b/test/unit/stream_alert_rule_processor/test_sink.py @@ -69,12 +69,13 @@ def test_json_from_dict(): json_message = sink._json_dump(alert) # Test with single alert entry - assert_equal(json_message, '{"record": {"record_data_key02_01": ' \ - '"record_data_value02_01", "record_data_key01_01": "record_data_value01_01"}, ' \ - '"metadata": {"source": {"service": "payload_service_01", ' \ - '"entity": "payload_entity_01"}, "rule_name": "test_rule_01", ' \ - '"type": "payload_type_01", "log": "payload_data_01", ' \ - '"outputs": "rule.outputs_01"}}') + assert_equal(json_message, '{"record": {"record_data_key02_01": ' + '"record_data_value02_01", "record_data_key01_01": ' + '"record_data_value01_01"}, "metadata": {"source": ' + '{"service": "payload_service_01", "entity": ' + '"payload_entity_01"}, "rule_name": "test_rule_01", ' + '"type": "payload_type_01", "log": "payload_data_01", ' + '"outputs": "rule.outputs_01"}}') def get_payload(byte_size): """Returns a base64 encoded random payload of (roughly) byte_size length From bf8433e8cc96ec9c66c08f70d92087b6084bb893 Mon Sep 17 00:00:00 2001 From: Ryan Deivert Date: Mon, 17 Jul 2017 16:40:43 -0700 Subject: [PATCH 14/14] [package] fixing issue where pip messes up logger --- stream_alert_cli/package.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/stream_alert_cli/package.py b/stream_alert_cli/package.py index e260cb54d..78ad16e94 100644 --- a/stream_alert_cli/package.py +++ b/stream_alert_cli/package.py @@ -23,8 +23,8 @@ import tempfile import boto3 -import pip +from stream_alert_cli.helpers import run_command from stream_alert_cli.logger import LOGGER_CLI @@ -170,19 +170,18 @@ def _resolve_third_party(self, temp_package_path): [boolean] False if the pip command failed to install requirements, True otherwise """ third_party_libs = self.config['lambda'][self.config_key]['third_party_libraries'] - if third_party_libs: - LOGGER_CLI.info( - 'Installing third-party libraries: %s', ', '.join(third_party_libs)) - pip_command = ['install'] - pip_command.extend(third_party_libs) - pip_command.extend(['--upgrade', '--target', temp_package_path]) - # Return True if the pip result code is 0 - return pip.main(pip_command) == 0 - else: + # Return a default of True here if no libraries to install + if not third_party_libs: LOGGER_CLI.info('No third-party libraries to install.') + return True - # Return a default of True here if pip is not called - return True + LOGGER_CLI.info('Installing third-party libraries: %s', ', '.join(third_party_libs)) + pip_command = ['pip', 'install'] + pip_command.extend(third_party_libs) + pip_command.extend(['--upgrade', '--target', temp_package_path]) + + # Return True if the pip command is successfully run + return run_command(pip_command, cwd=temp_package_path) def _upload(self, package_path): """Upload the StreamAlert package and sha256 to S3.