From 9e01eda9f884cec6ab7291d9abe54a2ecac99c34 Mon Sep 17 00:00:00 2001 From: Austin Byers Date: Fri, 16 Mar 2018 16:28:02 -0700 Subject: [PATCH 1/6] Rule processor no longer invokes alert processor --- stream_alert/rule_processor/alert_forward.py | 66 +-------- stream_alert/rule_processor/handler.py | 11 +- stream_alert_cli/helpers.py | 33 +++++ stream_alert_cli/logger.py | 1 + stream_alert_cli/test.py | 16 ++- terraform/modules/tf_stream_alert/iam.tf | 24 ---- terraform/modules/tf_stream_alert/main.tf | 9 +- .../modules/tf_stream_alert_globals/main.tf | 6 - .../test_alert_forward.py | 130 +++--------------- .../test_handler.py | 20 +-- 10 files changed, 81 insertions(+), 235 deletions(-) diff --git a/stream_alert/rule_processor/alert_forward.py b/stream_alert/rule_processor/alert_forward.py index 2244453cd..96546c6cc 100644 --- a/stream_alert/rule_processor/alert_forward.py +++ b/stream_alert/rule_processor/alert_forward.py @@ -16,7 +16,6 @@ from datetime import datetime import json import os -import time import boto3 from botocore.exceptions import ClientError @@ -27,7 +26,6 @@ class AlertForwarder(object): """Sends alerts to the Alert Processor and the alerts Dynamo table.""" - # TODO: Do not send to Alert Processor after Alert Merger is implemented def __init__(self, env): """Initialize the Forwarder with the boto3 clients and resource names. @@ -35,66 +33,8 @@ def __init__(self, env): Args: env (dict): loaded dictionary containing environment information """ - self.env = env self.table = boto3.resource( 'dynamodb', region_name=env['lambda_region']).Table(os.environ['ALERTS_TABLE']) - self.client_lambda = boto3.client('lambda', region_name=self.env['lambda_region']) - self.function = os.environ['ALERT_PROCESSOR'] - - # Keep track of unprocessed items when retrying batch_write_item() - self.unprocessed_items = None - - def _send_to_lambda(self, alerts): - """Invoke Alert Processor directly - - Sends a message to the alert processor with the following JSON format: - { - 'record': record, - 'rule_name': rule.rule_name, - 'rule_description': rule.rule_function.__doc__ or DEFAULT_RULE_DESCRIPTION, - 'log_source': str(payload.log_source), - 'log_type': payload.type, - 'outputs': rule.outputs, - 'source_service': payload.service(), - 'source_entity': payload.entity, - 'context': rule.context - } - """ - for alert in alerts: - try: - data = json.dumps(alert, default=lambda o: o.__dict__) - except AttributeError as err: - LOGGER.error('An error occurred while dumping alert to JSON: %s ' - 'Alert: %s', - err.message, - alert) - continue - - try: - response = self.client_lambda.invoke( - FunctionName=self.function, - InvocationType='Event', - Payload=data, - Qualifier='production' - ) - - except ClientError as err: - LOGGER.exception('An error occurred while sending alert to ' - '\'%s:production\'. Error is: %s. Alert: %s', - self.function, - err.response, - data) - continue - - if response['ResponseMetadata']['HTTPStatusCode'] != 202: - LOGGER.error('Failed to send alert to \'%s\': %s', - self.function, data) - continue - - if self.env['lambda_alias'] != 'development': - LOGGER.info('Sent alert to \'%s\' with Lambda request ID \'%s\'', - self.function, - response['ResponseMetadata']['RequestId']) @staticmethod def dynamo_record(alert): @@ -112,9 +52,7 @@ def dynamo_record(alert): 'SourceService': alert['source_service'], 'Outputs': set(alert['outputs']), # Compact JSON encoding (no extra spaces) - 'Record': json.dumps(alert['record'], separators=(',', ':')), - # TODO: Remove TTL after alert merger is implemented - 'TTL': int(time.time()) + 7200 # 2 hour TTL + 'Record': json.dumps(alert['record'], separators=(',', ':')) } def _send_to_dynamo(self, alerts): @@ -132,8 +70,6 @@ def send_alerts(self, alerts): Args: alerts (list): A list of dictionaries representing json alerts. """ - self._send_to_lambda(alerts) - try: self._send_to_dynamo(alerts) except ClientError: diff --git a/stream_alert/rule_processor/handler.py b/stream_alert/rule_processor/handler.py index f75e08a14..04e515816 100644 --- a/stream_alert/rule_processor/handler.py +++ b/stream_alert/rule_processor/handler.py @@ -30,15 +30,12 @@ class StreamAlert(object): """Wrapper class for handling StreamAlert classificaiton and processing""" config = {} - def __init__(self, context, enable_alert_processor=True): + def __init__(self, context): """Initializer Args: context (dict): An AWS context object which provides metadata on the currently executing lambda function. - enable_alert_processor (bool): If the user wants to send the alerts using their - own methods, 'enable_alert_processor' can be set to False to suppress - sending with the StreamAlert alert processor. """ # Load the config. Validation occurs during load, which will # raise exceptions on any ConfigErrors @@ -54,7 +51,6 @@ def __init__(self, context, enable_alert_processor=True): # Instantiate a classifier that is used for this run self.classifier = StreamClassifier(config=self.config) - self.enable_alert_processor = enable_alert_processor self._failed_record_count = 0 self._processed_record_count = 0 self._processed_size = 0 @@ -128,7 +124,7 @@ def run(self, event): # Apply Threat Intel to normalized records in the end of Rule Processor invocation record_alerts = self._rule_engine.threat_intel_match(payload_with_normalized_records) self._alerts.extend(record_alerts) - if record_alerts and self.enable_alert_processor: + if record_alerts: self.alert_forwarder.send_alerts(record_alerts) MetricLogger.log_metric(FUNCTION_NAME, @@ -218,7 +214,6 @@ def _process_alerts(self, payload): # Extend the list of alerts with any new ones so they can be returned self._alerts.extend(record_alerts) - if self.enable_alert_processor: - self.alert_forwarder.send_alerts(record_alerts) + self.alert_forwarder.send_alerts(record_alerts) return payload_with_normalized_records diff --git a/stream_alert_cli/helpers.py b/stream_alert_cli/helpers.py index 1a13a9148..b4bc09240 100644 --- a/stream_alert_cli/helpers.py +++ b/stream_alert_cli/helpers.py @@ -489,6 +489,38 @@ def setup_mock_dynamodb_ioc_table(config): ) +@mock_dynamodb2 +def setup_mock_alerts_table(table_name): + """Create a mock DynamoDB alerts table used by rule processor, alert processor, alert merger""" + boto3.client('dynamodb').create_table( + AttributeDefinitions=[ + { + 'AttributeName': 'RuleName', + 'AttributeType': 'S' + }, + { + 'AttributeName': 'AlertID', + 'AttributeType': 'S' + } + ], + KeySchema=[ + { + 'AttributeName': 'RuleName', + 'KeyType': 'HASH' + }, + { + 'AttributeName': 'AlertID', + 'KeyType': 'RANGE' + } + ], + ProvisionedThroughput={ + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + }, + TableName=table_name + ) + + def put_mock_s3_object(bucket, key, data, region): """Create a mock AWS S3 object for testing @@ -524,6 +556,7 @@ def wrap(func): """Wrap the returned function with or without mocks""" if context.mocked: @mock_cloudwatch + @mock_dynamodb2 @mock_kinesis @mock_kms @mock_lambda diff --git a/stream_alert_cli/logger.py b/stream_alert_cli/logger.py index f8f1841f7..255d27b8f 100644 --- a/stream_alert_cli/logger.py +++ b/stream_alert_cli/logger.py @@ -35,6 +35,7 @@ def filter(self, record): 'Completed download in*', '*triggered alert*', '*Firehose*', + 'Successfully sent*', 'Got * normalized records' ) diff --git a/stream_alert_cli/test.py b/stream_alert_cli/test.py index 5a6681f4a..bb11f2c23 100644 --- a/stream_alert_cli/test.py +++ b/stream_alert_cli/test.py @@ -80,7 +80,7 @@ def __init__(self, context, config, print_output): helpers.setup_mock_dynamodb_ioc_table(config) # Create the RuleProcessor. Passing a mocked context object with fake # values and False for suppressing sending of alerts to alert processor - self.processor = StreamAlert(context, False) + self.processor = StreamAlert(context) self.cli_config = config # Use a list of status_messages to store pass/fail/warning info self.status_messages = [] @@ -329,7 +329,6 @@ def _detect_old_test_event(test_event): return False - def check_keys(self, test_event): """Check if the test event contains the required keys @@ -483,7 +482,6 @@ def check_log_declared_in_sources(self, base_message, test_event): return True - def analyze_record_delta(self, file_name, test_event): """Provide some additional context on why this test failed. This will perform some analysis of the test record to determine which keys are @@ -492,7 +490,7 @@ def analyze_record_delta(self, file_name, test_event): the end of the test run. Args: - rule_name (str): Name of rule being tested + file_name (str): Name of file containing the test event test_event (dict): Actual record data being tested """ base_message = ('Invalid test event in file \'{}.json\' with description ' @@ -611,7 +609,6 @@ def __init__(self, config, context): self.outputs_config = load_outputs_config() self.region = config['global']['account']['region'] self._cleanup_old_secrets() - self.region = config['global']['account']['region'] helpers.setup_mock_firehose_delivery_streams(config) def test_processor(self, alerts): @@ -933,10 +930,13 @@ def run_tests(options, context): options (namedtuple): CLI options (debug, processor, etc) context (namedtuple): A constructed aws context object """ - # The Rule Processor uses env variables to determine where alerts should be forwarded: + # The Rule Processor and Alert Processor need environment variables for many things prefix = config['global']['account']['prefix'] + alerts_table = '{}_streamalert_alerts'.format(prefix) os.environ['ALERT_PROCESSOR'] = '{}_streamalert_alert_processor'.format(prefix) - os.environ['ALERTS_TABLE'] = '{}_streamalert_alerts'.format(prefix) + os.environ['ALERTS_TABLE'] = alerts_table + os.environ['AWS_DEFAULT_REGION'] = config['global']['account']['region'] + os.environ['CLUSTER'] = run_options.get('cluster') or '' if options.debug: # TODO(jack): Currently there is no (clean) way to set @@ -982,6 +982,8 @@ def run_tests(options, context): files_filter = run_options.get('files', {}) # Run the rule processor for all rules or designated rule set + if context.mocked: + helpers.setup_mock_alerts_table(alerts_table) for alerts in rule_proc_tester.test_processor(rules_filter, files_filter, validate_schemas): diff --git a/terraform/modules/tf_stream_alert/iam.tf b/terraform/modules/tf_stream_alert/iam.tf index d1835898c..7af1f7aa5 100644 --- a/terraform/modules/tf_stream_alert/iam.tf +++ b/terraform/modules/tf_stream_alert/iam.tf @@ -23,30 +23,6 @@ resource "aws_iam_role_policy_attachment" "stream_alert_rule_processor_cloudwatc policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" } -// IAM Role Policy: Allow the Rule Processor to invoke the Alert Processor -resource "aws_iam_role_policy" "streamalert_rule_processor_lambda" { - name = "LambdaInvokeAlertProcessor" - role = "${aws_iam_role.streamalert_rule_processor_role.id}" - - policy = "${data.aws_iam_policy_document.rule_processor_invoke_alert_proc.json}" -} - -// IAM Policy Doc: Allow the Rule Processor to invoke the Alert Processor -data "aws_iam_policy_document" "rule_processor_invoke_alert_proc" { - statement { - effect = "Allow" - - actions = [ - "lambda:InvokeFunction", - ] - - # Use interpolation because of the different VPC/non vpc resources - resources = [ - "arn:aws:lambda:${var.region}:${var.account_id}:function:${var.prefix}_streamalert_alert_processor", - ] - } -} - // IAM Role Policy: Allow the Rule Processor to save alerts to dynamo. resource "aws_iam_role_policy" "save_alerts_to_dynamo" { name = "SaveAlertsToDynamo" diff --git a/terraform/modules/tf_stream_alert/main.tf b/terraform/modules/tf_stream_alert/main.tf index e28d5bd19..d880c6545 100644 --- a/terraform/modules/tf_stream_alert/main.tf +++ b/terraform/modules/tf_stream_alert/main.tf @@ -13,11 +13,10 @@ resource "aws_lambda_function" "streamalert_rule_processor" { environment { variables = { - ALERT_PROCESSOR = "${var.prefix}_streamalert_alert_processor" - ALERTS_TABLE = "${var.prefix}_streamalert_alerts" - CLUSTER = "${var.cluster}" - ENABLE_METRICS = "${var.rule_processor_enable_metrics}" - LOGGER_LEVEL = "${var.rule_processor_log_level}" + ALERTS_TABLE = "${var.prefix}_streamalert_alerts" + CLUSTER = "${var.cluster}" + ENABLE_METRICS = "${var.rule_processor_enable_metrics}" + LOGGER_LEVEL = "${var.rule_processor_log_level}" } } diff --git a/terraform/modules/tf_stream_alert_globals/main.tf b/terraform/modules/tf_stream_alert_globals/main.tf index 4160e4398..e853dd086 100644 --- a/terraform/modules/tf_stream_alert_globals/main.tf +++ b/terraform/modules/tf_stream_alert_globals/main.tf @@ -23,12 +23,6 @@ resource "aws_dynamodb_table" "alerts_table" { name = "AlertID" type = "S" } - // Enable expriation time while testing Dynamo table for alerts - // TODO: Remove TTL once Alert Merger is implemented - ttl { - attribute_name = "TTL" - enabled = true - } tags { Name = "StreamAlert" } diff --git a/tests/unit/stream_alert_rule_processor/test_alert_forward.py b/tests/unit/stream_alert_rule_processor/test_alert_forward.py index b2c7d84f4..a53d7d45b 100644 --- a/tests/unit/stream_alert_rule_processor/test_alert_forward.py +++ b/tests/unit/stream_alert_rule_processor/test_alert_forward.py @@ -13,8 +13,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -# pylint: disable=no-self-use,protected-access -from datetime import datetime import os from botocore.exceptions import ClientError @@ -26,7 +24,6 @@ from stream_alert.rule_processor.config import load_env from tests.unit.stream_alert_rule_processor.test_helpers import get_mock_context - _MOCK_ALERT = { 'id': 'test-uuid', 'log_source': 'test_source', @@ -40,143 +37,54 @@ 'source_entity': 'test_entity', 'source_service': 'test_service' } +_ALERT_TABLE = 'corp-prefix_streamalert_alerts' +_CLUSTER = 'corp' -@patch.dict(os.environ, {'CLUSTER': 'corp'}) +@patch.dict(os.environ, {'CLUSTER': _CLUSTER}) class TestAlertForwarder(object): """Test class for AlertForwarder""" - ALERT_PROCESSOR = 'corp-prefix_streamalert_alert_processor' - ALERTS_TABLE = 'corp-prefix_streamalert_alerts' - - @classmethod - def setup_class(cls): - """Setup the class before any methods""" - patcher = patch('boto3.client') - cls.boto_mock = patcher.start() - context = get_mock_context() - env = load_env(context) - with patch.dict(os.environ, {'ALERT_PROCESSOR': cls.ALERT_PROCESSOR, - 'ALERTS_TABLE': cls.ALERTS_TABLE}): - cls.forwarder = AlertForwarder(env) - - @classmethod - def teardown_class(cls): - """Teardown the class after any methods""" - cls.forwarder = None - cls.boto_mock.stop() - - def teardown(self): - """Teardown the class after each methods""" - self.forwarder.env['lambda_alias'] = 'development' - - @patch('stream_alert.rule_processor.alert_forward.LOGGER') - def test_lambda_boto_error(self, log_mock): - """AlertForwarder - Lambda - Boto Error""" - - err_response = {'Error': {'Code': 100}} - - # Add ClientError side_effect to mock - self.boto_mock.return_value.invoke.side_effect = ClientError( - err_response, 'operation') + # pylint: disable=no-self-use,protected-access - self.forwarder._send_to_lambda(['alert!!!']) - - log_mock.assert_has_calls([ - call.exception( - 'An error occurred while sending alert to \'%s:production\'. ' - 'Error is: %s. Alert: %s', self.ALERT_PROCESSOR, - err_response, '"alert!!!"' - ) - ]) - - @patch('stream_alert.rule_processor.alert_forward.LOGGER') - def test_lambda_resp_error(self, log_mock): - """AlertForwarder - Lambda - Boto Response Error""" - self.boto_mock.return_value.invoke.side_effect = [{ - 'ResponseMetadata': {'HTTPStatusCode': 201}}] - - self.forwarder._send_to_lambda(['alert!!!']) - - log_mock.assert_has_calls([ - call.error('Failed to send alert to \'%s\': %s', self.ALERT_PROCESSOR, '"alert!!!"') - ]) + @patch.dict(os.environ, {'ALERTS_TABLE': _ALERT_TABLE}) + def setup(self): + # pylint: disable=attribute-defined-outside-init + self.forwarder = AlertForwarder(load_env(get_mock_context())) - @patch('stream_alert.rule_processor.alert_forward.LOGGER') - def test_lambda_success(self, log_mock): - """AlertForwarder - Lambda - Success""" - self.boto_mock.return_value.invoke.side_effect = [{ - 'ResponseMetadata': { - 'HTTPStatusCode': 202, - 'RequestId': 'reqID' - } - }] - - # Swap out the alias so the logging occurs - self.forwarder.env['lambda_alias'] = 'production' - - self.forwarder._send_to_lambda(['alert!!!']) - - log_mock.assert_has_calls([ - call.info('Sent alert to \'%s\' with Lambda request ID \'%s\'', - self.ALERT_PROCESSOR, 'reqID') - ]) - - @patch('stream_alert.rule_processor.alert_forward.LOGGER') - def test_lambda_bad_obj(self, log_mock): - """AlertForwarder - Lambda - JSON Dump Bad Object""" - bad_object = datetime.utcnow() - self.forwarder._send_to_lambda([bad_object]) - - log_mock.assert_has_calls([ - call.error('An error occurred while dumping alert to JSON: %s Alert: %s', - '\'datetime.datetime\' object has no attribute \'__dict__\'', bad_object), - ]) - - def test_dynamo_record(self): - """AlertForwarder - Convert Alert to Dynamo Record""" - record = AlertForwarder.dynamo_record(_MOCK_ALERT) + def test_alert_item(self): + """AlertForwarder - Convert Alert to Dynamo Item""" + item = AlertForwarder.dynamo_record(_MOCK_ALERT) expected = { 'RuleName': 'test_name', 'AlertID': 'test-uuid', 'Created': ANY, - 'Cluster': 'corp', + 'Cluster': _CLUSTER, 'LogSource': 'test_source', 'LogType': 'test_type', 'RuleDescription': 'Test Description', 'SourceEntity': 'test_entity', 'SourceService': 'test_service', 'Outputs': {'out1:here', 'out2:there'}, # Duplicates are ignored - 'Record': '{"key":"value"}', - 'TTL': ANY + 'Record': '{"key":"value"}' } - assert_equal(expected, record) + assert_equal(expected, item) @mock_dynamodb2() @patch('stream_alert.rule_processor.alert_forward.LOGGER') - def test_send_to_dynamo(self, mock_logger): - """AlertForwarder - Send Alerts to Dynamo""" - self.forwarder._send_to_dynamo([_MOCK_ALERT] * 2) + def test_send_alerts(self, mock_logger): + """AlertForwarder - Send Alerts""" + self.forwarder.send_alerts([_MOCK_ALERT] * 2) mock_logger.assert_has_calls([ - call.info('Successfully sent %d alert(s) to dynamo:%s', 2, self.ALERTS_TABLE) + call.info('Successfully sent %d alert(s) to dynamo:%s', 2, _ALERT_TABLE) ]) @patch.object(AlertForwarder, '_send_to_dynamo') - @patch.object(AlertForwarder, '_send_to_lambda') - def test_send_alerts(self, mock_lambda, mock_dynamo): - """AlertForwarder - Send Alerts Entry Point""" - self.forwarder.send_alerts(None) - mock_lambda.assert_called_once_with(None) - mock_dynamo.assert_called_once_with(None) - - @patch.object(AlertForwarder, '_send_to_dynamo') - @patch.object(AlertForwarder, '_send_to_lambda') @patch('stream_alert.rule_processor.alert_forward.LOGGER') - def test_send_alerts_dynamo_exception(self, mock_logger, mock_lambda, mock_dynamo): + def test_send_alerts_dynamo_exception(self, mock_logger, mock_dynamo): """AlertForwarder - Send Alerts with Dynamo Exception""" mock_dynamo.side_effect = ClientError({}, 'batch_write') self.forwarder.send_alerts(None) - mock_lambda.assert_called_once_with(None) mock_dynamo.assert_called_once_with(None) mock_logger.assert_has_calls([ call.exception('Error saving alerts to Dynamo') diff --git a/tests/unit/stream_alert_rule_processor/test_handler.py b/tests/unit/stream_alert_rule_processor/test_handler.py index dd152da85..6c681ff9c 100644 --- a/tests/unit/stream_alert_rule_processor/test_handler.py +++ b/tests/unit/stream_alert_rule_processor/test_handler.py @@ -20,7 +20,7 @@ import os from mock import call, patch -from moto import mock_kinesis +from moto import mock_dynamodb2, mock_kinesis from nose.tools import ( assert_equal, assert_false, @@ -43,6 +43,8 @@ rule = StreamRules.rule +@mock_dynamodb2 +@patch.dict(os.environ, {'CLUSTER': 'corp'}) class TestStreamAlert(object): """Test class for StreamAlert class""" @@ -52,7 +54,7 @@ class TestStreamAlert(object): 'ALERTS_TABLE': 'unit-testing_streamalert_alerts'}) def setup(self): """Setup before each method""" - self.__sa_handler = StreamAlert(get_mock_context(), False) + self.__sa_handler = StreamAlert(get_mock_context()) def test_run_no_records(self): """StreamAlert Class - Run, No Records""" @@ -120,13 +122,14 @@ def test_run_load_payload_bad( 'record' ) + @patch('stream_alert.rule_processor.alert_forward.AlertForwarder.send_alerts') @patch('stream_alert.rule_processor.handler.StreamRules.process') @patch('stream_alert.rule_processor.handler.StreamClassifier.extract_service_and_entity') - def test_run_with_alert(self, extract_mock, rules_mock): + def test_run_with_alert(self, extract_mock, rules_mock, alerts_mock): """StreamAlert Class - Run, With Alert""" + alerts_mock.return_value = [] extract_mock.return_value = ('kinesis', 'unit_test_default_stream') rules_mock.return_value = (['success!!'], ['normalized_records']) - passed = self.__sa_handler.run(get_valid_event()) assert_true(passed) @@ -179,9 +182,6 @@ def test_run_send_alerts(self, extract_mock, rules_mock, forwarder_mock): extract_mock.return_value = ('kinesis', 'unit_test_default_stream') rules_mock.return_value = (['success!!'], ['normalized_records']) - # Set send_alerts to true so the send_alerts happens - self.__sa_handler.enable_alert_processor = True - # Swap out the alias so the logging occurs self.__sa_handler.env['lambda_alias'] = 'production' @@ -190,12 +190,14 @@ def test_run_send_alerts(self, extract_mock, rules_mock, forwarder_mock): forwarder_mock.assert_called_with(['success!!']) @patch('logging.Logger.debug') + @patch('stream_alert.rule_processor.alert_forward.AlertForwarder.send_alerts') @patch('stream_alert.rule_processor.handler.StreamRules.process') @patch('stream_alert.rule_processor.handler.StreamClassifier.extract_service_and_entity') - def test_run_debug_log_alert(self, extract_mock, rules_mock, log_mock): + def test_run_debug_log_alert(self, extract_mock, rules_mock, alerts_mock, log_mock): """StreamAlert Class - Run, Debug Log Alert""" extract_mock.return_value = ('kinesis', 'unit_test_default_stream') rules_mock.return_value = (['success!!'], ['normalized_records']) + alerts_mock.return_value = [] # Cache the logger level log_level = LOGGER.getEffectiveLevel() @@ -280,7 +282,7 @@ def match_ipaddress(_): # pylint: disable=unused-variable mock_threat_intel.return_value = StreamThreatIntel('test_table_name', 'us-east-1') mock_query.return_value = ([], []) - sa_handler = StreamAlert(get_mock_context(), False) + sa_handler = StreamAlert(get_mock_context()) event = { 'account': 123456, 'region': '123456123456', From aaa619250fa2f635a851bc88f87201f52178e0d6 Mon Sep 17 00:00:00 2001 From: Austin Byers Date: Fri, 16 Mar 2018 17:44:49 -0700 Subject: [PATCH 2/6] Add alert merger, running on a regular interval --- conf/lambda.json | 31 ++++ manage.py | 3 +- stream_alert/alert_merger/__init__.py | 19 ++ stream_alert/alert_merger/main.py | 151 ++++++++++++++++ stream_alert/shared/__init__.py | 1 + stream_alert_cli/config.py | 1 + stream_alert_cli/manage_lambda/deploy.py | 14 +- stream_alert_cli/manage_lambda/package.py | 10 ++ stream_alert_cli/manage_lambda/rollback.py | 10 ++ stream_alert_cli/terraform/alert_merger.py | 47 +++++ stream_alert_cli/terraform/generate.py | 10 ++ stream_alert_cli/terraform/handler.py | 2 +- stream_alert_cli/terraform/lambda_module.py | 4 +- .../modules/tf_alert_merger_iam/README.md | 4 + terraform/modules/tf_alert_merger_iam/main.tf | 35 ++++ .../modules/tf_alert_merger_iam/variables.tf | 15 ++ terraform/modules/tf_lambda/main.tf | 3 + tests/unit/helpers/base.py | 14 +- .../stream_alert_alert_merger/__init__.py | 0 .../stream_alert_alert_merger/test_main.py | 163 ++++++++++++++++++ .../manage_lambda/test_rollback.py | 7 +- 21 files changed, 531 insertions(+), 13 deletions(-) create mode 100644 stream_alert/alert_merger/__init__.py create mode 100644 stream_alert/alert_merger/main.py create mode 100644 stream_alert_cli/terraform/alert_merger.py create mode 100644 terraform/modules/tf_alert_merger_iam/README.md create mode 100644 terraform/modules/tf_alert_merger_iam/main.tf create mode 100644 terraform/modules/tf_alert_merger_iam/variables.tf create mode 100644 tests/unit/stream_alert_alert_merger/__init__.py create mode 100644 tests/unit/stream_alert_alert_merger/test_main.py diff --git a/conf/lambda.json b/conf/lambda.json index f5a85b9b8..33480cbd0 100644 --- a/conf/lambda.json +++ b/conf/lambda.json @@ -1,4 +1,35 @@ { + "alert_merger_config": { + "concurrency_limit": 1, + "current_version": "$LATEST", + "handler": "stream_alert.alert_merger.main.handler", + "log_level": "info", + "log_retention_days": 14, + "memory": 128, + "metric_alarms": { + "errors": { + "enabled": true, + "evaluation_periods": 1, + "period_secs": 120, + "threshold": 0 + }, + "throttles": { + "enabled": true, + "evaluation_periods": 1, + "period_secs": 120, + "threshold": 1 + } + }, + "schedule_expression": "rate(1 minute)", + "source_bucket": "PREFIX_GOES_HERE.streamalert.source", + "source_current_hash": "", + "source_object_key": "", + "timeout": 60, + "vpc_config": { + "security_group_ids": [], + "subnet_ids": [] + } + }, "alert_processor_config": { "current_version": "$LATEST", "handler": "stream_alert.alert_processor.main.handler", diff --git a/manage.py b/manage.py index 4b43b92cd..45503ba95 100755 --- a/manage.py +++ b/manage.py @@ -986,7 +986,8 @@ def _add_default_lambda_args(lambda_parser): # require the name of the processor being deployed/rolled back lambda_parser.add_argument( '-p', '--processor', - choices=['alert', 'all', 'athena', 'rule', 'apps', 'threat_intel_downloader'], + choices=['all', 'alert', 'alert_merger', 'apps', 'athena', 'rule', + 'threat_intel_downloader'], help=ARGPARSE_SUPPRESS, nargs='+', action=UniqueSetAction, diff --git a/stream_alert/alert_merger/__init__.py b/stream_alert/alert_merger/__init__.py new file mode 100644 index 000000000..aaa62af49 --- /dev/null +++ b/stream_alert/alert_merger/__init__.py @@ -0,0 +1,19 @@ +"""Initialize logging for the alert merger.""" +import logging +import os + +# Create a package level logger to import +LEVEL = os.environ.get('LOGGER_LEVEL', 'INFO').upper() + +# Cast integer levels to avoid a ValueError +if LEVEL.isdigit(): + LEVEL = int(LEVEL) + +logging.basicConfig(format='%(name)s [%(levelname)s]: [%(module)s.%(funcName)s] %(message)s') + +LOGGER = logging.getLogger('StreamAlert') +try: + LOGGER.setLevel(LEVEL) +except (TypeError, ValueError) as err: + LOGGER.setLevel('INFO') + LOGGER.error('Defaulting to INFO logging: %s', err) diff --git a/stream_alert/alert_merger/main.py b/stream_alert/alert_merger/main.py new file mode 100644 index 000000000..0ed96706f --- /dev/null +++ b/stream_alert/alert_merger/main.py @@ -0,0 +1,151 @@ +""" +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from __future__ import absolute_import +from decimal import Decimal +import json +import os +import time + +from stream_alert.alert_merger import LOGGER + +import boto3 +from boto3.dynamodb.conditions import Attr, Key +from botocore.exceptions import ClientError + +MERGER = None # Cache the instantiation of the AlertMerger for the life of the Lambda container. + + +class AlertTable(object): + """Provides convenience methods for accessing and modifying the alerts table.""" + + def __init__(self, table_name): + self.table = boto3.resource('dynamodb').Table(table_name) + + @staticmethod + def _paginate(func, func_kwargs): + """Paginate results from a scan() or query(). + + Args: + func (method): Function to invoke (ALERTS_TABLE.scan or ALERTS_TABLE.query) + func_kwargs (dict): Keyword arguments to pass to the scan/query function. + The kwargs will be modified if pagination is necessary. + + Yields: + dict: Each item (row) from the response + """ + while True: + response = func(**func_kwargs) + for item in response.get('Items', []): + yield item + + if response.get('LastEvaluatedKey'): + func_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey'] + else: + return + + def rule_names(self): + """Returns the set of distinct rule names (str) found in the table.""" + kwargs = { + 'ProjectionExpression': 'RuleName', + 'Select': 'SPECIFIC_ATTRIBUTES' + } + return set(item['RuleName'] for item in self._paginate(self.table.scan, kwargs)) + + def pending_alerts(self, rule_name, alert_proc_timeout_sec): + """Find all alerts for the given rule which need to be dispatched to the alert processor. + + Args: + rule_name (str): Select all alerts from this rule name + alert_proc_timeout_sec (int): Alert processor timeout + This is used to determine whether an alert could still be in progress + + Yields: + dict: Each alert (row) with all columns and values. + """ + kwargs = { + # Include only those alerts which have not yet dispatched or were dispatched more than + # ALERT_PROCESSOR_TIMEOUT seconds ago + 'FilterExpression': (Attr('Dispatched').not_exists() | + Attr('Dispatched').lt(int(time.time()) - alert_proc_timeout_sec)), + 'KeyConditionExpression': Key('RuleName').eq(rule_name), + 'Select': 'ALL_ATTRIBUTES' + } + for item in self._paginate(self.table.query, kwargs): + yield item + + def mark_as_dispatched(self, rule_name, alert_id): + """Mark a specific alert as dispatched (in progress).""" + # Update the alerts table with the dispatch time, but only if the alert still exists. + # (The alert processor could have deleted the alert before the table update finishes). + try: + self.table.update_item( + Key={'RuleName': rule_name, 'AlertID': alert_id}, + UpdateExpression='SET Dispatched = :now ADD Attempts :one', + ExpressionAttributeValues={':now': int(time.time()), ':one': 1}, + ConditionExpression='attribute_exists(AlertID)' + ) + except ClientError as error: + if error.response['Error']['Code'] == 'ConditionalCheckFailedException': + LOGGER.warn('Conditional update failed: %s', error.response['Error']['Message']) + else: + raise + + +class AlertEncoder(json.JSONEncoder): + """Custom JSON encoder which handles sets and Decimals.""" + def default(self, obj): # pylint: disable=arguments-differ,method-hidden + if isinstance(obj, set): + return list(obj) + if isinstance(obj, Decimal): + return float(obj) + return json.JSONEncoder.default(self, obj) + + +# TODO: Alert merging will be implemented here +class AlertMerger(object): + """Dispatch alerts to the alert processor.""" + def __init__(self): + self.alerts_db = AlertTable(os.environ['ALERTS_TABLE']) + self.alert_proc = os.environ['ALERT_PROCESSOR'] + self.alert_proc_timeout = int(os.environ['ALERT_PROCESSOR_TIMEOUT_SEC']) + self.lambda_client = boto3.client('lambda') + + def _dispatch_alert(self, alert): + """Dispatch all alerts which need to be sent to the rule processor.""" + LOGGER.info('Dispatching alert %s to %s (attempt %d)', + alert['AlertID'], self.alert_proc, alert.get('Attempts', 0) + 1) + + self.lambda_client.invoke( + FunctionName=self.alert_proc, + InvocationType='Event', + Payload=json.dumps(alert, cls=AlertEncoder, separators=(',', ':')), + Qualifier='production' + ) + self.alerts_db.mark_as_dispatched(alert['RuleName'], alert['AlertID']) + + def dispatch(self): + """Find and dispatch all pending alerts to the alert processor.""" + for rule_name in self.alerts_db.rule_names(): + for alert in self.alerts_db.pending_alerts(rule_name, self.alert_proc_timeout): + self._dispatch_alert(alert) + + +def handler(event, context): # pylint: disable=unused-argument + """Entry point for the alert merger.""" + global MERGER # pylint: disable=global-statement + if not MERGER: + MERGER = AlertMerger() + MERGER.dispatch() diff --git a/stream_alert/shared/__init__.py b/stream_alert/shared/__init__.py index 4593666f5..308fe823d 100644 --- a/stream_alert/shared/__init__.py +++ b/stream_alert/shared/__init__.py @@ -3,6 +3,7 @@ import os +ALERT_MERGER_NAME = 'alert_merger' ALERT_PROCESSOR_NAME = 'alert_processor' ATHENA_PARTITION_REFRESH_NAME = 'athena_partition_refresh' RULE_PROCESSOR_NAME = 'rule_processor' diff --git a/stream_alert_cli/config.py b/stream_alert_cli/config.py index a3e4aafdb..5b4ac88d2 100644 --- a/stream_alert_cli/config.py +++ b/stream_alert_cli/config.py @@ -108,6 +108,7 @@ def set_prefix(self, prefix): ['{}.streamalerts'.format(prefix)] = 'alerts' lambda_funcs = [ + 'alert_merger', 'alert_processor', 'athena_partition_refresh', 'rule_processor', diff --git a/stream_alert_cli/manage_lambda/deploy.py b/stream_alert_cli/manage_lambda/deploy.py index fd8b2523f..78fb89e56 100644 --- a/stream_alert_cli/manage_lambda/deploy.py +++ b/stream_alert_cli/manage_lambda/deploy.py @@ -36,7 +36,9 @@ def _publish_version(packages, config, clusters): Returns: bool: Result of Lambda version publishing """ - global_packages = {'alert_processor', 'athena_partition_refresh', 'threat_intel_downloader'} + global_packages = { + 'alert_merger', 'alert_processor', 'athena_partition_refresh', 'threat_intel_downloader' + } for package in packages: if package.package_name in global_packages: @@ -68,7 +70,13 @@ def _create_and_upload(function_name, config, cluster=None): 'alert': PackageMap( stream_alert_packages.AlertProcessorPackage, {'module.alert_processor_lambda'}, - True), + True + ), + 'alert_merger': PackageMap( + stream_alert_packages.AlertMergerPackage, + {'module.alert_merger_lambda'}, + True + ), 'apps': PackageMap( stream_alert_packages.AppIntegrationPackage, {'module.app_{}_{}'.format(app_name, cluster) @@ -125,7 +133,7 @@ def deploy(options, config): packages = [] if 'all' in options.processor: - processors = {'alert', 'athena', 'apps', 'rule', 'threat_intel_downloader'} + processors = {'alert', 'alert_merger', 'apps', 'athena', 'rule', 'threat_intel_downloader'} else: processors = options.processor diff --git a/stream_alert_cli/manage_lambda/package.py b/stream_alert_cli/manage_lambda/package.py index 89d7e9eb5..80432c736 100644 --- a/stream_alert_cli/manage_lambda/package.py +++ b/stream_alert_cli/manage_lambda/package.py @@ -300,6 +300,16 @@ class AlertProcessorPackage(LambdaPackage): version = stream_alert_version +class AlertMergerPackage(LambdaPackage): + """Deployment package class for the StreamAlert Alert Merger function""" + package_folders = {'stream_alert/alert_merger', 'stream_alert/shared', 'conf'} + package_files = {'stream_alert/__init__.py'} + package_name = 'alert_merger' + config_key = 'alert_merger_config' + third_party_libs = {'backoff'} + version = stream_alert_version + + class AppIntegrationPackage(LambdaPackage): """Deployment package class for App integration functions""" package_folders = {'app_integrations'} diff --git a/stream_alert_cli/manage_lambda/rollback.py b/stream_alert_cli/manage_lambda/rollback.py index 1e11272e9..6ef42d7ab 100644 --- a/stream_alert_cli/manage_lambda/rollback.py +++ b/stream_alert_cli/manage_lambda/rollback.py @@ -55,6 +55,13 @@ def _rollback_alert(config): return ['module.alert_processor_lambda'] +def _rollback_alert_merger(config): + """Decrement the current_version for the alert merger.""" + lambda_config = config['lambda']['alert_merger_config'] + if _try_decrement_version(lambda_config, 'alert_merger'): + return ['module.alert_merger_lambda'] + + def _rollback_apps(config, clusters): """Decrement the current_version for all of the apps functions in the given clusters.""" tf_targets = [] @@ -107,6 +114,9 @@ def rollback(options, config): if rollback_all or 'alert' in options.processor: tf_targets.extend(_rollback_alert(config) or []) + if rollback_all or 'alert_merger' in options.processor: + tf_targets.extend(_rollback_alert_merger(config) or []) + if rollback_all or 'apps' in options.processor: tf_targets.extend(_rollback_apps(config, clusters) or []) diff --git a/stream_alert_cli/terraform/alert_merger.py b/stream_alert_cli/terraform/alert_merger.py new file mode 100644 index 000000000..854a6e50c --- /dev/null +++ b/stream_alert_cli/terraform/alert_merger.py @@ -0,0 +1,47 @@ +""" +Copyright 2017-present, Airbnb Inc. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from stream_alert.shared import ALERT_MERGER_NAME +from stream_alert_cli.terraform.common import infinitedict +from stream_alert_cli.terraform.lambda_module import generate_lambda + + +def generate_alert_merger(config): + """Generate Terraform for the Alert Merger + Args: + config (dict): The loaded config from the 'conf/' directory + Returns: + dict: Alert Merger Terraform definition to be marshaled to JSON + """ + prefix = config['global']['account']['prefix'] + + result = infinitedict() + + # Set variables for the alert merger's IAM permissions + result['module']['alert_merger_iam'] = { + 'source': 'modules/tf_alert_merger_iam', + 'account_id': config['global']['account']['aws_account_id'], + 'region': config['global']['account']['region'], + 'prefix': config['global']['account']['prefix'], + 'role_id': '${module.alert_merger_lambda.role_id}' + } + + # Set variables for the Lambda module + result['module']['alert_merger_lambda'] = generate_lambda( + ALERT_MERGER_NAME, config, { + 'ALERTS_TABLE': '{}_streamalert_alerts'.format(prefix), + 'ALERT_PROCESSOR': '{}_streamalert_alert_processor'.format(prefix), + 'ALERT_PROCESSOR_TIMEOUT_SEC': config['lambda']['alert_processor_config']['timeout'] + } + ) + + return result diff --git a/stream_alert_cli/terraform/generate.py b/stream_alert_cli/terraform/generate.py index 3a327d592..37c595294 100644 --- a/stream_alert_cli/terraform/generate.py +++ b/stream_alert_cli/terraform/generate.py @@ -26,6 +26,7 @@ infinitedict, monitoring_topic_arn ) +from stream_alert_cli.terraform.alert_merger import generate_alert_merger from stream_alert_cli.terraform.alert_processor import generate_alert_processor from stream_alert_cli.terraform.app_integrations import generate_app_integrations from stream_alert_cli.terraform.athena import generate_athena @@ -394,6 +395,15 @@ def terraform_generate(config, init=False): message='Removing old Alert Processor Terraform file' ) + # Setup Alert Merger + generate_global_lambda_settings( + config, + config_name='alert_merger_config', + config_generate_func=generate_alert_merger, + tf_tmp_file='terraform/alert_merger.tf.json', + message='Removing old Alert Merger Terraform file' + ) + return True diff --git a/stream_alert_cli/terraform/handler.py b/stream_alert_cli/terraform/handler.py index 4ac76290e..d56d4bd51 100644 --- a/stream_alert_cli/terraform/handler.py +++ b/stream_alert_cli/terraform/handler.py @@ -120,7 +120,7 @@ def _terraform_init(config): LOGGER_CLI.info('Deploying Lambda Functions') - deploy(deploy_opts(['rule', 'alert', 'athena'], []), config) + deploy(deploy_opts(['rule', 'alert', 'alert_merger', 'athena'], []), config) # we need to manually create the streamalerts table since terraform does not support this # See: https://github.com/terraform-providers/terraform-provider-aws/issues/1486 diff --git a/stream_alert_cli/terraform/lambda_module.py b/stream_alert_cli/terraform/lambda_module.py index 7d499f955..44b297bb8 100644 --- a/stream_alert_cli/terraform/lambda_module.py +++ b/stream_alert_cli/terraform/lambda_module.py @@ -18,7 +18,9 @@ def _lambda_config(function_name, config): """Find the config specific to this Lambda function.""" - if function_name == shared.ALERT_PROCESSOR_NAME: + if function_name == shared.ALERT_MERGER_NAME: + return config['lambda']['alert_merger_config'] + elif function_name == shared.ALERT_PROCESSOR_NAME: return config['lambda']['alert_processor_config'] else: raise NotImplementedError( diff --git a/terraform/modules/tf_alert_merger_iam/README.md b/terraform/modules/tf_alert_merger_iam/README.md new file mode 100644 index 000000000..8f63f9670 --- /dev/null +++ b/terraform/modules/tf_alert_merger_iam/README.md @@ -0,0 +1,4 @@ +# Alert Merger Permissions +This module adds IAM permissions specific to the alert merger: + * Managing the alerts table + * Invoking the alert processor diff --git a/terraform/modules/tf_alert_merger_iam/main.tf b/terraform/modules/tf_alert_merger_iam/main.tf new file mode 100644 index 000000000..e41b3d2c9 --- /dev/null +++ b/terraform/modules/tf_alert_merger_iam/main.tf @@ -0,0 +1,35 @@ +// Allow the Alert Merger to query and update the alerts table +resource "aws_iam_role_policy" "manage_alerts_table" { + name = "ManageAlertsTable" + role = "${var.role_id}" + policy = "${data.aws_iam_policy_document.manage_alerts_table.json}" +} + +data "aws_iam_policy_document" "manage_alerts_table" { + statement { + effect = "Allow" + + actions = [ + "dynamodb:Query", + "dynamodb:Scan", + "dynamodb:UpdateItem", + ] + + resources = ["arn:aws:dynamodb:${var.region}:${var.account_id}:table/${var.prefix}_streamalert_alerts"] + } +} + +// Allow the Alert Merger to invoke the Alert Processor +resource "aws_iam_role_policy" "invoke_alert_processor" { + name = "InvokeAlertProcessor" + role = "${var.role_id}" + policy = "${data.aws_iam_policy_document.invoke_alert_processor.json}" +} + +data "aws_iam_policy_document" "invoke_alert_processor" { + statement { + effect = "Allow" + actions = ["lambda:InvokeFunction"] + resources = ["arn:aws:lambda:${var.region}:${var.account_id}:function:${var.prefix}_streamalert_alert_processor"] + } +} diff --git a/terraform/modules/tf_alert_merger_iam/variables.tf b/terraform/modules/tf_alert_merger_iam/variables.tf new file mode 100644 index 000000000..abdf22465 --- /dev/null +++ b/terraform/modules/tf_alert_merger_iam/variables.tf @@ -0,0 +1,15 @@ +variable "account_id" { + description = "12-digit AWS Account ID" +} + +variable "region" { + description = "AWS region identifier" +} + +variable "prefix" { + description = "Prefix for resource names" +} + +variable "role_id" { + description = "Alert processor IAM Role ID" +} diff --git a/terraform/modules/tf_lambda/main.tf b/terraform/modules/tf_lambda/main.tf index 2e9407f85..3fcf71338 100644 --- a/terraform/modules/tf_lambda/main.tf +++ b/terraform/modules/tf_lambda/main.tf @@ -94,4 +94,7 @@ resource "aws_lambda_permission" "allow_cloudwatch_invocation" { principal = "events.amazonaws.com" source_arn = "${aws_cloudwatch_event_rule.invocation_schedule.arn}" qualifier = "${var.alias_name}" + + // The alias must be created before we can grant permission to invoke it + depends_on = ["aws_lambda_alias.alias_vpc", "aws_lambda_alias.alias_no_vpc"] } diff --git a/tests/unit/helpers/base.py b/tests/unit/helpers/base.py index 8df70b8f1..09a9d429f 100644 --- a/tests/unit/helpers/base.py +++ b/tests/unit/helpers/base.py @@ -115,6 +115,15 @@ def basic_streamalert_config(): } }, 'lambda': { + 'alert_merger_config': { + 'current_version': '$LATEST', + 'handler': 'stream_alert.alert_merger.main.handler', + 'memory': 128, + 'source_bucket': 'unit-testing.streamalert.source', + 'source_current_hash': '', + 'source_object_key': '', + 'timeout': 10 + }, 'alert_processor_config': { 'current_version': '$LATEST', 'handler': 'stream_alert.alert_processor.main.handler', @@ -122,7 +131,6 @@ def basic_streamalert_config(): 'source_bucket': 'unit-testing.streamalert.source', 'source_current_hash': '', 'source_object_key': '', - 'third_party_libraries': [], 'timeout': 10 }, 'athena_partition_refresh_config': { @@ -139,7 +147,6 @@ def basic_streamalert_config(): 'source_bucket': 'unit-testing.streamalert.source', 'source_current_hash': '', 'source_object_key': '', - 'third_party_libraries': [], 'timeout': 60 }, 'rule_processor_config': { @@ -171,9 +178,6 @@ def basic_streamalert_config(): 'table_rcu': 1000, 'table_wcu': 200, 'target_utilization': 70, - 'third_party_libraries': [ - 'requests' - ], 'timeout': 120 } }, diff --git a/tests/unit/stream_alert_alert_merger/__init__.py b/tests/unit/stream_alert_alert_merger/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/stream_alert_alert_merger/test_main.py b/tests/unit/stream_alert_alert_merger/test_main.py new file mode 100644 index 000000000..5a0a7d79e --- /dev/null +++ b/tests/unit/stream_alert_alert_merger/test_main.py @@ -0,0 +1,163 @@ +""" +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +# pylint: disable=no-self-use,protected-access +from decimal import Decimal +import json +import os + +from boto3.dynamodb.conditions import Key +from botocore.exceptions import ClientError +from mock import ANY, call, patch, MagicMock +from moto import mock_dynamodb2, mock_lambda +from nose.tools import assert_equal, assert_is_instance, assert_raises + +from stream_alert.alert_merger import main +from stream_alert_cli.helpers import create_lambda_function, setup_mock_alerts_table + +_ALERTS_TABLE = 'PREFIX_streamalert_alerts' +_ALERT_PROCESSOR = 'PREFIX_streamalert_alert_processor' +_ALERT_PROCESSOR_TIMEOUT_SEC = 60 + + +@mock_dynamodb2 +class TestAlertTable(object): + """Tests for merger/main.py:AlertTable""" + + @patch.dict(os.environ, {'AWS_DEFAULT_REGION': 'us-east-1'}) + def setup(self): + """Alert Merger - Alert Table - Add mock alerts to the table""" + # pylint: disable=attribute-defined-outside-init + setup_mock_alerts_table(_ALERTS_TABLE) + self.alert_table = main.AlertTable(_ALERTS_TABLE) + + with self.alert_table.table.batch_writer() as batch: + for i in range(3): + batch.put_item(Item={ + 'RuleName': 'even' if i % 2 == 0 else 'odd', + 'AlertID': 'alert-{}'.format(str(i)), + 'Created': '2018-03-14', + 'Cluster': 'test-cluster', + 'LogSource': 'test-log-source', + 'LogType': 'json', + 'RuleDescription': 'even' if i % 2 == 0 else 'odd', + 'SourceEntity': 'test-source-entity', + 'SourceService': 'test-source-service', + 'Outputs': {'aws-s3:test-bucket', 'slack:test-channel'}, + 'Record': json.dumps({ + 'key1': 'value1', + 'key2': 'value2' + }) + }) + + def test_paginate_multiple(self): + """Alert Merger - Alert Table - Paginate Traverses Multiple Pages""" + def mock_table_op(**kwargs): + """Returns two pages of mock results (moto's dynamo does not paginate)""" + if 'ExclusiveStartKey' in kwargs: + return {'Items': [2]} + return {'Items': [1], 'LastEvaluatedKey': 'somewhere'} + results = list(self.alert_table._paginate(mock_table_op, {})) + assert_equal([1, 2], results) + + def test_rule_names(self): + """Alert Merger - Alert Table - Rule Names From Table Scan""" + assert_equal({'even', 'odd'}, self.alert_table.rule_names()) + + def test_pending_alerts(self): + """Alert Merger - Alert Table - Pending Alerts From Table Query""" + alerts = list(self.alert_table.pending_alerts('odd', _ALERT_PROCESSOR_TIMEOUT_SEC)) + assert_equal(1, len(alerts)) + assert_equal('odd', alerts[0]['RuleName']) + assert_equal('alert-1', alerts[0]['AlertID']) + + def test_mark_as_dispatched(self): + """Alert Merger - Alert Table - Mark Alert As Dispatched""" + self.alert_table.mark_as_dispatched('even', 'alert-2') + + # Verify that there are now Attempts and Dispatched keys + response = self.alert_table.table.query( + KeyConditionExpression=Key('RuleName').eq('even') & Key('AlertID').eq('alert-2')) + item = response['Items'][0] + assert_equal(Decimal('1'), item['Attempts']) + assert_is_instance(item['Dispatched'], Decimal) + + @patch.object(main, 'LOGGER') + def test_mark_as_dispatched_conditional_fail(self, mock_logger): + """Alert Merger - Alert Table - Dispatched Alert is Already Deleted""" + self.alert_table.mark_as_dispatched('nonsense', 'bears') + mock_logger.warn.assert_called_once_with('Conditional update failed: %s', ANY) + + def test_mark_as_dispatched_exception(self): + """Alert Merger - Alert Table - An Unhandled Exception is Re-Raised""" + self.alert_table.table = MagicMock() + + def mock_update(**kwargs): # pylint: disable=unused-argument + raise ClientError({'Error': {'Code': 'TEST'}}, 'UpdateItem') + self.alert_table.table.update_item.side_effect = mock_update + + assert_raises(ClientError, self.alert_table.mark_as_dispatched, '', '') + + +class TestAlertEncoder(object): + """Tests for merger/main.py:AlertEncoder""" + + def test_to_json(self): + """Alert Merger - Alert Encoder - JSON Encoding for Set and Decimal""" + data = {'letter': {'a'}, 'number': {Decimal('1')}} + result = json.dumps(data, cls=main.AlertEncoder, sort_keys=True) + assert_equal('{"letter": ["a"], "number": [1.0]}', result) + + def test_to_json_invalid(self): + """Alert Merger - Alert Encoder - TypeError is raised when appropriate""" + assert_raises(TypeError, json.dumps, object, cls=main.AlertEncoder) + + +@mock_lambda +class TestAlertMerger(object): + """Tests for merger/main.py:AlertMerger""" + + @patch.dict(os.environ, { + 'ALERT_PROCESSOR': _ALERT_PROCESSOR, + 'ALERT_PROCESSOR_TIMEOUT_SEC': str(_ALERT_PROCESSOR_TIMEOUT_SEC), + 'ALERTS_TABLE': _ALERTS_TABLE, + 'AWS_DEFAULT_REGION': 'us-east-1' + }) + @patch.object(main, 'AlertTable', MagicMock()) + def setup(self): + """Alert Merger - Setup - Create AlertMerger instance and mock alert processor""" + # pylint: disable=attribute-defined-outside-init + self.merger = main.AlertMerger() + create_lambda_function(_ALERT_PROCESSOR, 'us-east-1') + + @patch.object(main, 'LOGGER') + def test_dispatch(self, mock_logger): + """Alert Merger - Alert Merger - Dispatch to Alert Processor Lambda""" + self.merger.alerts_db.rule_names.return_value = ['name'] + self.merger.alerts_db.pending_alerts.return_value = [{'AlertID': 'id', 'RuleName': 'name'}] + + self.merger.dispatch() + mock_logger.info.assert_called_once_with( + 'Dispatching alert %s to %s (attempt %d)', 'id', _ALERT_PROCESSOR, 1) + + +@patch.object(main, 'AlertMerger') +def test_handler(mock_instance): + """Alert Merger - Handler (Entry Point)""" + main.handler(None, None) + mock_instance.assert_has_calls([ + call(), + call().dispatch() + ]) diff --git a/tests/unit/stream_alert_cli/manage_lambda/test_rollback.py b/tests/unit/stream_alert_cli/manage_lambda/test_rollback.py index 846971a56..7ea657e9f 100644 --- a/tests/unit/stream_alert_cli/manage_lambda/test_rollback.py +++ b/tests/unit/stream_alert_cli/manage_lambda/test_rollback.py @@ -26,6 +26,7 @@ def setUp(self): self.config = MockCLIConfig(config=basic_streamalert_config()) # Find all function config sections (with 'current_version') + self.alert_merger_config = self.config['lambda']['alert_merger_config'] self.alert_config = self.config['lambda']['alert_processor_config'] self.apps_config_box = ( self.config['clusters']['corp']['modules']['stream_alert_apps']['box_collector']) @@ -39,8 +40,8 @@ def setUp(self): self.config['clusters']['corp']['modules']['stream_alert']['rule_processor']) self.func_configs = [ - self.alert_config, self.apps_config_box, self.apps_config_duo, self.athena_config, - self.downloader_config, self.rule_config_prod, self.rule_config_corp + self.alert_merger_config, self.alert_config, self.apps_config_box, self.apps_config_duo, + self.athena_config, self.downloader_config, self.rule_config_prod, self.rule_config_corp ] def test_rollback_all(self, mock_runner, mock_generate, mock_logger): @@ -59,6 +60,7 @@ def test_rollback_all(self, mock_runner, mock_generate, mock_logger): mock_logger.assert_not_called() mock_generate.assert_called_once_with(config=self.config) mock_runner.assert_called_once_with(targets=[ + 'module.alert_merger_lambda', 'module.alert_processor_lambda', 'module.box_collector_corp', 'module.duo_admin_collector_corp', @@ -78,6 +80,7 @@ def test_rollback_all_invalid(self, mock_runner, mock_generate, mock_logger): fmt = '%s cannot be rolled back from version %s' mock_logger.assert_has_calls([ + call.warn(fmt, 'alert_merger', '$LATEST'), call.warn(fmt, 'alert_processor', '1'), call.warn(fmt, 'duo_admin_collector_corp', '$LATEST'), call.warn(fmt, 'box_collector_corp', '$LATEST'), From e0839db8d79944f2568c1137bdd206995b88394b Mon Sep 17 00:00:00 2001 From: Austin Byers Date: Fri, 16 Mar 2018 18:23:09 -0700 Subject: [PATCH 3/6] Update Alert Processor with new invocation format --- stream_alert/alert_processor/helpers.py | 75 +--- stream_alert/alert_processor/main.py | 288 ++++++++------- .../alert_processor/outputs/output_base.py | 16 +- stream_alert_cli/test.py | 7 +- .../modules/tf_alert_processor_iam/main.tf | 21 ++ .../stream_alert_alert_processor/__init__.py | 17 +- .../stream_alert_alert_processor/helpers.py | 13 - .../test_helpers.py | 95 +---- .../stream_alert_alert_processor/test_main.py | 328 ++++++++---------- .../test_outputs/__init__.py | 21 -- .../test_outputs/test_output_base.py | 6 +- 11 files changed, 371 insertions(+), 516 deletions(-) diff --git a/stream_alert/alert_processor/helpers.py b/stream_alert/alert_processor/helpers.py index 8d61e201c..7108c37be 100644 --- a/stream_alert/alert_processor/helpers.py +++ b/stream_alert/alert_processor/helpers.py @@ -13,73 +13,22 @@ See the License for the specific language governing permissions and limitations under the License. """ -from stream_alert.alert_processor import LOGGER +from collections import OrderedDict -def validate_alert(alert): - """Helper function to perform simple validation of an alert's keys and structure +def ordered_dict(data, exclude_keys=None): + """Convert a dictionary into a sorted OrderedDictionary, removing extraneous keys.""" + result = OrderedDict() + for key, value in sorted(data.items()): + if exclude_keys and key in exclude_keys: + continue - Args: - alert (dict): the alert record to test that should be in the form of a dict - - Returns: - bool: whether or not the alert has the proper structure - """ - - if not isinstance(alert, dict): - LOGGER.error('The alert must be a map (dict)') - return False - - alert_keys = { - 'id', - 'record', - 'rule_name', - 'rule_description', - 'log_type', - 'log_source', - 'outputs', - 'source_service', - 'source_entity', - 'context' - } - if not set(alert.keys()) == alert_keys: - LOGGER.error('The alert object must contain the following keys: %s', - ', '.join(alert_keys)) - return False - - valid = True - - for key in alert_keys: - if key == 'record': - if not isinstance(alert['record'], dict): - LOGGER.error('The alert record must be a map (dict)') - return False - - elif key == 'context': - if not isinstance(alert['context'], dict): - LOGGER.error('The alert context must be a map (dict)') - return False - - elif key == 'outputs': - if not isinstance(alert[key], list): - LOGGER.error( - 'The value of the \'outputs\' key must be an array (list) that ' - 'contains at least one configured output.') - valid = False - continue - - for entry in alert[key]: - if not isinstance(entry, (str, unicode)): - LOGGER.error('The value of each entry in the \'outputs\' list ' - 'must be a string (str).') - valid = False - - elif not isinstance(alert[key], (str, unicode)): - LOGGER.error('The value of the \'%s\' key must be a string (str), not %s', - key, type(alert[key])) - valid = False + if isinstance(value, dict): + result[key] = ordered_dict(value, exclude_keys) + else: + result[key] = value - return valid + return result def elide_string_middle(text, max_length): diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 3c8b29c44..09742d68f 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -14,154 +14,186 @@ limitations under the License. """ from __future__ import absolute_import # Suppresses RuntimeWarning import error in Lambda -from collections import OrderedDict import json +import os from stream_alert.alert_processor import LOGGER -from stream_alert.alert_processor.helpers import validate_alert +from stream_alert.alert_processor.helpers import ordered_dict from stream_alert.alert_processor.outputs.output_base import StreamAlertOutput -from stream_alert.shared import NORMALIZATION_KEY - - -def handler(event, context): - """StreamAlert Alert Processor - - Args: - event (dict): contains a 'Records' top level key that holds - all of the records for this event. Each record dict then - contains a 'Message' key pointing to the alert payload that - has been sent from the main StreamAlert Rule processor function - context (AWSLambdaContext): basically a namedtuple of properties from AWS - - Returns: - list: Status values. Each entry in the list is a tuple - consisting of two values. The first value is a boolean that - indicates if sending was successful and the second value is the - output configuration info (ie - 'slack:sample_channel') - """ - # A failure to load the config will log the error in load_output_config - # and return here - config = _load_output_config() - if not config: - return - - split_arn = context.invoked_function_arn.split(':') - region = split_arn[3] - account_id = split_arn[4] - function_name = context.function_name - - # Return the current list of statuses back to the caller - return list(run(event, region, account_id, function_name, config)) - - -def run(alert, region, account_id, function_name, config): - """Send an Alert to its described outputs. - - Args: - alert (dict): dictionary representating an alert with the - following structure: - - { - 'id': uuid, - 'record': record, - 'rule_name': rule.rule_name, - 'rule_description': rule.rule_function.__doc__, - 'log_source': str(payload.log_source), - 'log_type': payload.type, - 'outputs': rule.outputs, - 'source_service': payload.service, - 'source_entity': payload.entity - } - - region (str): The AWS region of the currently executing Lambda function - account_id (str): The 12-digit AWS account ID of the currently executing Lambda function - function_name (str): The name of the lambda function - config (dict): The loaded configuration for outputs from conf/outputs.json - - Yields: - (bool, str): Dispatch status and name of the output to the handler - """ - if not validate_alert(alert): - LOGGER.error('Invalid alert format:\n%s', json.dumps(alert, indent=2)) - return - - LOGGER.info('Sending alert %s to outputs %s', alert['id'], alert['outputs']) - - # strip out unnecessary keys and sort - alert = _sort_dict(alert) - - outputs = alert['outputs'] - # Get the output configuration for this rule and send the alert to each - for output in set(outputs): +from stream_alert.shared import backoff_handlers, NORMALIZATION_KEY + +import backoff +import boto3 +from botocore.exceptions import ClientError + +ALERT_PROCESSOR = None # Cached instantiation of an Alert Processor + + +class AlertProcessor(object): + """Orchestrates delivery of alerts to the appropriate dispatchers.""" + BACKOFF_MAX_TRIES = 6 + OUTPUT_CONFIG_PATH = 'conf/outputs.json' + + def __init__(self, invoked_function_arn): + """Initialization logic that can be cached across invocations. + + Args: + invoked_function_arn (str): The ARN of the alert processor when it was invoked. + This is used to calculate region, account, and prefix. + """ + with open(self.OUTPUT_CONFIG_PATH) as f: + self.config = json.load(f) + + # arn:aws:lambda:REGION:ACCOUNT:function:PREFIX_streamalert_alert_processor:production + split_arn = invoked_function_arn.split(':') + self.region = split_arn[3] + self.account_id = split_arn[4] + self.prefix = split_arn[6].split('_')[0] + + self.alerts_table = boto3.resource('dynamodb').Table(os.environ['ALERTS_TABLE']) + + @staticmethod + def _build_alert_payload(record): + """Transform a raw Dynamo record into a payload ready for dispatching. + + Args: + record (dict): A row in the Dynamo alerts table + + Returns: + OrderedDict: An alert payload ready to be sent to output dispatchers. + """ + # Any problems with the alert keys or JSON loading will raise an exception here. + # This is what we want - an invalid alert is a show-stopper and shouldn't ever happen. + return ordered_dict({ + 'cluster': record['Cluster'], + 'created': record['Created'], + 'id': record['AlertID'], + 'log_source': record['LogSource'], + 'log_type': record['LogType'], + 'outputs': record['Outputs'], + 'record': json.loads(record['Record']), + 'rule_description': record['RuleDescription'], + 'rule_name': record['RuleName'], + 'source_entity': record['SourceEntity'], + 'source_service': record['SourceService'] + }, exclude_keys={NORMALIZATION_KEY}) + + def _create_dispatcher(self, output): + """Create a dispatcher for the given output. + + Args: + output (str): Alert output, e.g. "aws-sns:topic-name" + + Returns: + OutputDispatcher: Based on the output type. + Returns None if the output is invalid or not defined in the config. + """ try: service, descriptor = output.split(':') except ValueError: LOGGER.error('Improperly formatted output [%s]. Outputs for rules must ' 'be declared with both a service and a descriptor for the ' 'integration (ie: \'slack:my_channel\')', output) - continue + return None - if service not in config or descriptor not in config[service]: + if service not in self.config or descriptor not in self.config[service]: LOGGER.error('The output \'%s\' does not exist!', output) - continue + return None - # Retrieve the proper class to handle dispatching the alerts of this services - dispatcher = StreamAlertOutput.create_dispatcher( - service, region, account_id, function_name, config) + return StreamAlertOutput.create_dispatcher( + service, self.region, self.account_id, self.prefix, self.config) - if not dispatcher: - continue + @staticmethod + def _send_alert(alert_payload, output, dispatcher): + """Send a single alert to the given output. - LOGGER.debug('Sending alert to %s:%s', service, descriptor) - - sent = False + Returns: + bool: True if the alert was sent successfully. + """ + LOGGER.info('Sending alert %s to %s', alert_payload['id'], output) try: - sent = dispatcher.dispatch(descriptor=descriptor, - rule_name=alert['rule_name'], - alert=alert) - - except Exception as err: # pylint: disable=broad-except - LOGGER.exception('An error occurred while sending alert ' - 'to %s:%s: %s. alert:\n%s', service, descriptor, - err, json.dumps(alert, indent=2)) + return dispatcher.dispatch(descriptor=output.split(':')[1], + rule_name=alert_payload['rule_name'], + alert=alert_payload) + except Exception: # pylint: disable=broad-except + LOGGER.exception('Exception when sending alert %s to %s. Alert:\n%s', + alert_payload['id'], output, json.dumps(alert_payload, indent=2)) + return False + + @backoff.on_exception(backoff.expo, ClientError, + max_tries=BACKOFF_MAX_TRIES, jitter=backoff.full_jitter, + on_backoff=backoff_handlers.backoff_handler, + on_success=backoff_handlers.success_handler, + on_giveup=backoff_handlers.giveup_handler) + def _update_alerts_table(self, rule_name, alert_id, results): + """Update the alerts table based on which outputs sent successfully.""" + key = {'RuleName': rule_name, 'AlertID': alert_id} + if all(results.values()): + # All outputs sent successfully - delete Dynamo entry + self.alerts_table.delete_item(Key=key) + else: + # List failed outputs as needing a retry + self.alerts_table.update_item( + Key=key, + UpdateExpression='SET RetryOutputs = :failed_outputs', + ExpressionAttributeValues={ + ':failed_outputs': set( + output for output, success in results.items() if not success) + } + ) + + def run(self, event): + """Run the alert processor! + + Args: + event (dict): Invocation event (record from Dynamo table) + + Returns: + dict(str, bool): Maps each output to whether it was sent successfully. + Invalid outputs are excluded from the result. + """ + payload = self._build_alert_payload(event) + + # Try sending to each output, keeping track of which was successful + results = {} + for output in event.get('RetryOutputs') or event['Outputs']: + dispatcher = self._create_dispatcher(output) + if dispatcher: + results[output] = self._send_alert(payload, output, dispatcher) + + self._update_alerts_table(event['RuleName'], event['AlertID'], results) + return results - # Yield back the result to the handler - yield sent, output - -def _sort_dict(unordered_dict): - """Recursively sort a dictionary +def handler(event, context): + """StreamAlert Alert Processor - entry point Args: - unordered_dict (dict): an alert dictionary - - Returns: - OrderedDict: a sorted version of the dictionary - """ - result = OrderedDict() - for key, value in sorted(unordered_dict.items(), key=lambda t: t[0]): - if key == NORMALIZATION_KEY: - continue - if isinstance(value, dict): - result[key] = _sort_dict(value) - continue - - result[key] = value - - return result - - -def _load_output_config(config_path='conf/outputs.json'): - """Load the outputs configuration file from disk + event (dict): Record from the alerts Dynamo table: { + 'AlertID': str, # UUID + 'Attempts': int, # Number of attempts to send this alert so far + 'Cluster': str, # Cluster which generated the alert + 'Created': str, # Human-readable timestamp when the alert was created + 'Dispatched': int, # Time (seconds UTC) when the alert was last dispatched + 'LogSource': str, # Log source (e.g. "binaryalert") which generated the alert + 'LogType' str, # "json" + 'Outputs': list[str], # Unique list of service:descriptor output targets + 'Record': str, # JSON-encoded record body + 'RetryOutputs': list[str] # Optional list of outputs which need to be retried + 'RuleDescription': str # Non-empty rule description + 'RuleName': str, # Non-empty rule name + 'SourceEntity': str, # Name of the alert source (e.g. "my-topic", "sample-channel") + 'SourceService': str, # Service which generated the alert (e.g. "sns", "slack") + } + context (AWSLambdaContext): Lambda invocation context Returns: - dict: the output configuration settings + dict(str, bool): Maps each output to whether it was sent successfully. + For example, {'aws-firehose:sample': False, 'slack:example-channel': True}. + NOTE: Invalid outputs are excluded from the result (they should not be retried) """ - with open(config_path) as outputs: - try: - config = json.load(outputs) - except ValueError: - LOGGER.error('The \'%s\' file could not be loaded into json', config_path) - return - - return config + global ALERT_PROCESSOR # pylint: disable=global-statement + if not ALERT_PROCESSOR: + # Create the alert processor if we haven't already + ALERT_PROCESSOR = AlertProcessor(context.invoked_function_arn) + return ALERT_PROCESSOR.run(event) diff --git a/stream_alert/alert_processor/outputs/output_base.py b/stream_alert/alert_processor/outputs/output_base.py index e4fe654c0..4302293c7 100644 --- a/stream_alert/alert_processor/outputs/output_base.py +++ b/stream_alert/alert_processor/outputs/output_base.py @@ -69,14 +69,14 @@ def __new__(cls, output): return output @classmethod - def create_dispatcher(cls, service, region, account_id, function_name, config): + def create_dispatcher(cls, service, region, account_id, prefix, config): """Returns the subclass that should handle this particular service Args: service (str): The service identifier for this output region (str): The AWS region to use for some output types account_id (str): The AWS account ID for computing AWS output ARNs - function_name (str): The invoking AWS Lambda function name + prefix (str): The resource prefix config (dict): The loaded output configuration dict Returns: @@ -86,7 +86,7 @@ def create_dispatcher(cls, service, region, account_id, function_name, config): if not dispatcher: return False - return dispatcher(region, account_id, function_name, config) + return dispatcher(region, account_id, prefix, config) @classmethod def get_dispatcher(cls, service): @@ -142,10 +142,10 @@ class OutputDispatcher(object): # out for both get and post requests. This applies to both connection and read timeouts _DEFAULT_REQUEST_TIMEOUT = 3.05 - def __init__(self, region, account_id, function_name, config): + def __init__(self, region, account_id, prefix, config): self.region = region self.account_id = account_id - self.secrets_bucket = self._get_secrets_bucket_name(function_name) + self.secrets_bucket = '{}_streamalert_secrets'.format(prefix) self.config = config @staticmethod @@ -205,12 +205,6 @@ def _load_creds(self, descriptor): return creds_dict - @classmethod - def _get_secrets_bucket_name(cls, function_name): - """Returns the streamalerts secrets s3 bucket name""" - prefix = function_name.split('_')[0] - return '.'.join([prefix, 'streamalert', 'secrets']) - def _get_creds_from_s3(self, cred_location, descriptor): """Pull the encrypted credential blob for this service and destination from s3 diff --git a/stream_alert_cli/test.py b/stream_alert_cli/test.py index bb11f2c23..49983cf0b 100644 --- a/stream_alert_cli/test.py +++ b/stream_alert_cli/test.py @@ -29,6 +29,7 @@ from mock import patch from stream_alert.alert_processor import main as StreamOutput +from stream_alert.rule_processor.alert_forward import AlertForwarder from stream_alert.rule_processor.handler import StreamAlert # import all rules loaded from the main handler import stream_alert.rule_processor.main # pylint: disable=unused-import @@ -630,7 +631,11 @@ def test_processor(self, alerts): if self.context.mocked: self.setup_outputs(alert) - for current_test_passed, output in StreamOutput.handler(alert, self.context): + # Convert alert to the Dynamo event format expected by the alert processor + event = AlertForwarder.dynamo_record(alert) + event['Outputs'] = list(event['Outputs']) + + for output, current_test_passed in StreamOutput.handler(event, self.context).items(): self.all_tests_passed = current_test_passed and self.all_tests_passed service, descriptor = output.split(':') message = 'sending alert to \'{}\''.format(descriptor) diff --git a/terraform/modules/tf_alert_processor_iam/main.tf b/terraform/modules/tf_alert_processor_iam/main.tf index c460c252c..b76e89474 100644 --- a/terraform/modules/tf_alert_processor_iam/main.tf +++ b/terraform/modules/tf_alert_processor_iam/main.tf @@ -1,6 +1,7 @@ // Permissions specific to the alert processor: decrypting secrets, sending alerts to outputs locals { + dynamo_arn_prefix = "arn:aws:dynamodb:${var.region}:${var.account_id}:table" firehose_arn_prefix = "arn:aws:firehose:${var.region}:${var.account_id}" lambda_arn_prefix = "arn:aws:lambda:${var.region}:${var.account_id}:function" sns_arn_prefix = "arn:aws:sns:${var.region}:${var.account_id}" @@ -16,6 +17,26 @@ locals { sqs_outputs = "${concat(var.output_sqs_queues, list("unused"))}" } +// Allow the Alert Processor to update the alerts table +resource "aws_iam_role_policy" "update_alerts_table" { + name = "UpdateAlertsTable" + role = "${var.role_id}" + policy = "${data.aws_iam_policy_document.update_alerts_table.json}" +} + +data "aws_iam_policy_document" "update_alerts_table" { + statement { + effect = "Allow" + + actions = [ + "dynamodb:DeleteItem", + "dynamodb:UpdateItem", + ] + + resources = ["${local.dynamo_arn_prefix}/${var.prefix}_streamalert_alerts"] + } +} + // Allow the Alert Processor to retrieve and decrypt output secrets resource "aws_iam_role_policy" "output_secrets" { name = "DecryptOutputSecrets" diff --git a/tests/unit/stream_alert_alert_processor/__init__.py b/tests/unit/stream_alert_alert_processor/__init__.py index a3a48aab0..1dd8e83c5 100644 --- a/tests/unit/stream_alert_alert_processor/__init__.py +++ b/tests/unit/stream_alert_alert_processor/__init__.py @@ -1,22 +1,23 @@ -''' +""" Copyright 2017-present, Airbnb Inc. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -''' -from stream_alert.alert_processor.main import _load_output_config +""" +import json REGION = 'us-east-1' ACCOUNT_ID = '123456789012' -FUNCTION_NAME = 'corp-prefix_prod_streamalert_alert_processor' -CONFIG = _load_output_config('tests/unit/conf/outputs.json') +PREFIX = 'prefix' +FUNCTION_NAME = '{}_streamalert_alert_processor'.format(PREFIX) +OUTPUT_CONFIG_PATH = 'tests/unit/conf/outputs.json' +with open(OUTPUT_CONFIG_PATH) as f: + CONFIG = json.load(f) +ALERTS_TABLE = '{}_streamalert_alerts'.format(PREFIX) KMS_ALIAS = 'alias/stream_alert_secrets_test' diff --git a/tests/unit/stream_alert_alert_processor/helpers.py b/tests/unit/stream_alert_alert_processor/helpers.py index e36901740..3c21dac2f 100644 --- a/tests/unit/stream_alert_alert_processor/helpers.py +++ b/tests/unit/stream_alert_alert_processor/helpers.py @@ -19,19 +19,6 @@ import shutil import tempfile -from mock import Mock - -from tests.unit.stream_alert_alert_processor import FUNCTION_NAME, REGION - - -def get_mock_context(): - """Create a fake context object using Mock""" - arn = 'arn:aws:lambda:{}:555555555555:function:{}:production' - context = Mock(invoked_function_arn=(arn.format(REGION, FUNCTION_NAME)), - function_name='corp-prefix_prod_streamalert_alert_processor') - - return context - def get_random_alert(key_count, rule_name, omit_rule_desc=False): """This loop generates key/value pairs with a key of length 6 and diff --git a/tests/unit/stream_alert_alert_processor/test_helpers.py b/tests/unit/stream_alert_alert_processor/test_helpers.py index db2a1bfd9..7eddf16fe 100644 --- a/tests/unit/stream_alert_alert_processor/test_helpers.py +++ b/tests/unit/stream_alert_alert_processor/test_helpers.py @@ -13,97 +13,24 @@ See the License for the specific language governing permissions and limitations under the License. """ -from nose.tools import assert_equal, assert_false, assert_true +from collections import OrderedDict -from stream_alert.alert_processor.helpers import elide_string_middle, validate_alert -from tests.unit.stream_alert_alert_processor.helpers import get_alert +from nose.tools import assert_equal, assert_is_instance +from stream_alert.alert_processor.helpers import elide_string_middle, ordered_dict -def test_valid_alert(): - """Alert Processor Input Validation - Valid Alert Structure""" - # Test with a valid alert structure - assert_true(validate_alert(get_alert())) - -def test_valid_alert_type(): - """Alert Processor Input Validation - Invalid Alert Type""" - assert_false(validate_alert('not-a-real-alert-object')) - - -def test_alert_keys(): - """Alert Processor Input Validation - Alert Keys Missing""" - # Default valid alert to be modified - missing_alert_key = get_alert() - - # Alter keys to break validation (not all required keys) - missing_alert_key.pop('rule_name') - - # Test with invalid metadata keys - assert_false(validate_alert(missing_alert_key)) - - -def test_invalid_record(): - """Alert Processor Input Validation - Invalid Alert Record""" - # Default valid alert to be modified - invalid_alert = get_alert() - - # metadata > source value validation - invalid_alert['record'] = 100 - - # Test with invalid metadata source values - assert_false(validate_alert(invalid_alert)) - - -def test_metadata_source_value(): - """Alert Processor Input Validation - Source Entity Value""" - # Default valid alert to be modified - invalid_metadata_source = get_alert() - - # metadata > source value validation - invalid_metadata_source['source_entity'] = 100 - - # Test with invalid metadata source values - assert_false(validate_alert(invalid_metadata_source)) - - -def test_outputs_type(): - """Alert Processor Input Validation - Metadata Outputs Bad Type""" - # Default valid alert to be modified - invalid_metadata_outputs = get_alert() - - # metadata > outputs type validation - invalid_metadata_outputs['outputs'] = {'bad': 'value'} - - # Test with invalid metadata outputs type - assert_false(validate_alert(invalid_metadata_outputs)) - - -def test_outputs_value_type(): - """Alert Processor Input Validation - Metadata Outputs Bad Value Type""" - # Default valid alert to be modified - invalid_metadata_outputs = get_alert() - - # metadata > outputs value validation - invalid_metadata_outputs['outputs'] = ['good', 100] - - # Test with invalid metadata outputs value - assert_false(validate_alert(invalid_metadata_outputs)) - - -def test_metadata_non_string_type(): - """Alert Processor Input Validation - Metadata Non-String""" - # Default valid alert to be modified - invalid_metadata_non_string = get_alert() - - # metadata > non-string value validation - invalid_metadata_non_string['log_type'] = 4.5 - - # Test with invalid metadata non-string value - assert_false(validate_alert(invalid_metadata_non_string)) +def test_ordered_dict(): + """Alert Processor - Helpers - Ordered Dict""" + data = {'ignore': None, 'c': 3, 'a': 1, 'b': {'b': 2, 'c': 3, 'a': 1, 'skip': None}} + result = ordered_dict(data, exclude_keys={'skip', 'ignore'}) + assert_is_instance(result, OrderedDict) + assert_equal(['a', 'b', 'c'], result.keys()) + assert_equal(['a', 'b', 'c'], result['b'].keys()) def test_elide_string_middle(): - """Alert Processor String Truncation""" + """Alert Processor - Helpers - String Truncation""" alphabet = 'abcdefghijklmnopqrstuvwxyz' # String shortened diff --git a/tests/unit/stream_alert_alert_processor/test_main.py b/tests/unit/stream_alert_alert_processor/test_main.py index ed1b4deb0..09a62c839 100644 --- a/tests/unit/stream_alert_alert_processor/test_main.py +++ b/tests/unit/stream_alert_alert_processor/test_main.py @@ -13,194 +13,158 @@ See the License for the specific language governing permissions and limitations under the License. """ -# pylint: disable=protected-access from collections import OrderedDict import json +import os -from mock import call, mock_open, patch +from mock import ANY, call, MagicMock, patch +from moto import mock_dynamodb2 from nose.tools import ( assert_equal, + assert_false, assert_is_instance, - assert_list_equal, + assert_is_none, assert_true ) -import stream_alert.alert_processor as ap -from stream_alert.alert_processor.main import _load_output_config, _sort_dict, handler -from tests.unit.stream_alert_alert_processor import FUNCTION_NAME, REGION -from tests.unit.stream_alert_alert_processor.helpers import get_alert, get_mock_context - - -@patch('stream_alert.alert_processor.main.run') -def test_handler_run(run_mock): - """Main handler `run` call params""" - context = get_mock_context() - handler(None, context) - - # This test will load the actual config, so we should compare the - # function call against the same config here. - run_mock.assert_called_with(None, REGION, '5'*12, FUNCTION_NAME, _load_output_config()) - - -@patch('logging.Logger.error') -def test_bad_config(log_mock): - """Load output config - bad config""" - mock = mock_open(read_data='non-json string that will log an error') - with patch('__builtin__.open', mock): - handler(None, None) - - log_mock.assert_called_with( - 'The \'%s\' file could not be loaded into json', - 'conf/outputs.json') - - -def test_handler_return(): - """Main handler return value""" - context = get_mock_context() - event = {'Records': []} - value = handler(event, context) - - assert_is_instance(value, list) - - -def test_load_output_config(): - """Load outputs configuration file""" - config = _load_output_config('tests/unit/conf/outputs.json') - - assert_equal(set(config.keys()), { - 'aws-firehose', 'aws-s3', 'aws-lambda', 'aws-sns', 'aws-sqs', - 'pagerduty', 'phantom', 'slack' - }) - - -def test_sort_dict(): - """Sorted Dict""" - dict_to_sort = {'c': 100, 'd': 1000, 'a': 1, 'b': 10, 'e': 100, 'f': 10, 'g': 1} - sorted_dict = _sort_dict(dict_to_sort) - - assert_equal(type(sorted_dict), OrderedDict) - - keys = ['a', 'b', 'c', 'd', 'e', 'f', 'g'] - for index, key in enumerate(sorted_dict.keys()): - assert_equal(keys[index], key) - - -def test_sort_dict_recursive(): - """Sorted Dict Recursion""" - dict_to_sort = {'c': 100, 'a': 1, 'b': {'b': 10, 'c': 1000, 'a': 1}} - sorted_dict = _sort_dict(dict_to_sort) - - assert_equal(type(sorted_dict), OrderedDict) - assert_equal(type(sorted_dict['b']), OrderedDict) - - sub_keys = ['a', 'b', 'c'] - for index, key in enumerate(sorted_dict['b'].keys()): - assert_equal(sub_keys[index], key) - - -@patch('requests.post') -@patch('stream_alert.alert_processor.main._load_output_config') -@patch('stream_alert.alert_processor.outputs.output_base.OutputDispatcher._load_creds') -def test_running_success(creds_mock, config_mock, get_mock): - """Alert Processor run handler - success""" - config_mock.return_value = _load_output_config('tests/unit/conf/outputs.json') - creds_mock.return_value = {'url': 'http://mock.url'} - get_mock.return_value.status_code = 200 - - alert = get_alert() - context = get_mock_context() - - result = handler(alert, context) - assert_is_instance(result, list) - - assert_true(result[0][0]) - - -@patch('logging.Logger.error') -@patch('stream_alert.alert_processor.main._load_output_config') -def test_running_bad_output(config_mock, log_mock): - """Alert Processor run handler - bad output""" - config_mock.return_value = _load_output_config('tests/unit/conf/outputs.json') - - alert = get_alert() - alert['outputs'] = ['slack'] - context = get_mock_context() - - handler(alert, context) - - log_mock.assert_called_with( - 'Improperly formatted output [%s]. Outputs for rules must ' - 'be declared with both a service and a descriptor for the ' - 'integration (ie: \'slack:my_channel\')', 'slack') - - alert['outputs'] = ['slakc:test'] - - handler(alert, context) - - log_mock.assert_called_with('The output \'%s\' does not exist!', 'slakc:test') - - -@patch('stream_alert.alert_processor.main._load_output_config') -@patch('stream_alert.alert_processor.outputs.output_base.StreamAlertOutput.get_dispatcher') -def test_running_no_dispatcher(dispatch_mock, config_mock): - """Alert Processor - Run Handler With No Dispatcher""" - config_mock.return_value = _load_output_config('tests/unit/conf/outputs.json') - dispatch_mock.return_value = None - - alert = get_alert() - context = get_mock_context() - - result = handler(alert, context) - - assert_is_instance(result, list) - assert_list_equal(result, []) - - -@patch('logging.Logger.exception') -@patch('requests.get') -@patch('stream_alert.alert_processor.main._load_output_config') -@patch('stream_alert.alert_processor.outputs.output_base.StreamAlertOutput.create_dispatcher') -@patch('stream_alert.alert_processor.outputs.output_base.OutputDispatcher._load_creds') -def test_running_exception_occurred(creds_mock, dispatch_mock, config_mock, get_mock, log_mock): - """Alert Processor - Run Handler, Exception Occurred""" - # Use TypeError as the mock's side_effect - err = TypeError('bad error') - creds_mock.return_value = {'url': 'mock.url'} - dispatch_mock.return_value.dispatch.side_effect = err - config_mock.return_value = _load_output_config('tests/unit/conf/outputs.json') - get_mock.return_value.status_code = 200 - - alert = _sort_dict(get_alert()) - context = get_mock_context() - - handler(alert, context) - - log_mock.assert_called_with( - 'An error occurred while sending alert ' - 'to %s:%s: %s. alert:\n%s', 'slack', 'unit_test_channel', - err, json.dumps(alert, indent=2)) - - -@patch('stream_alert.alert_processor.LOGGER.error') -def test_init_logging_bad(log_mock): - """Alert Processor Init - Logging, Bad Level""" - with patch.dict('os.environ', {'LOGGER_LEVEL': 'IFNO'}): - - # Force reload the alert_processor package to trigger the init - reload(ap) - - message = str(call('Defaulting to INFO logging: %s', - ValueError('Unknown level: \'IFNO\'',))) - - assert_equal(str(log_mock.call_args_list[0]), message) - - -@patch('stream_alert.alert_processor.LOGGER.setLevel') -def test_init_logging_int_level(log_mock): - """Alert Processor Init - Logging, Integer Level""" - with patch.dict('os.environ', {'LOGGER_LEVEL': '10'}): - - # Force reload the alert_processor package to trigger the init - reload(ap) - - log_mock.assert_called_with(10) +from stream_alert.alert_processor.main import AlertProcessor, handler +from stream_alert.alert_processor.outputs.output_base import OutputDispatcher +from tests.unit.stream_alert_alert_processor import ( + ACCOUNT_ID, ALERTS_TABLE, FUNCTION_NAME, OUTPUT_CONFIG_PATH, PREFIX, REGION) + +_ARN = 'arn:aws:lambda:{}:{}:function:{}:production'.format(REGION, ACCOUNT_ID, FUNCTION_NAME) +_EVENT = { + 'AlertID': '00-0-0-00', + 'Cluster': 'corp', + 'Created': '2018-03-14T00:00:00.000Z', + 'LogSource': 'carbonblack:binarystore.file.added', + 'LogType': 'json', + 'Outputs': ['slack:unit_test_channel'], + 'Record': json.dumps({ + 'file_path': '/tmp/file.zip', + 'md5': 'ABC' + }), + 'RuleDescription': 'Info about this rule and what actions to take', + 'RuleName': 'cb_binarystore_file_added', + 'SourceEntity': 'bucket.name', + 'SourceService': 's3' +} + + +@mock_dynamodb2 +@patch.dict(os.environ, {'AWS_DEFAULT_REGION': 'us-east-1', 'ALERTS_TABLE': ALERTS_TABLE}) +@patch.object(AlertProcessor, 'BACKOFF_MAX_TRIES', 1) +@patch.object(AlertProcessor, 'OUTPUT_CONFIG_PATH', OUTPUT_CONFIG_PATH) +class TestAlertProcessor(object): + """Tests for alert_processor/main.py""" + # pylint: disable=no-self-use,protected-access + + def test_init(self): + """Alert Processor - Initialization""" + processor = AlertProcessor(_ARN) + assert_is_instance(processor.config, dict) + assert_equal(processor.region, REGION) + assert_equal(processor.account_id, ACCOUNT_ID) + assert_equal(processor.prefix, PREFIX) + assert_equal(processor.alerts_table.table_name, ALERTS_TABLE) + + def test_build_alert_payload(self): + """Alert Processor - Building the Alert Payload""" + payload = AlertProcessor._build_alert_payload(_EVENT) + assert_is_instance(payload, OrderedDict) + expected_keys = { + 'cluster', 'created', 'id', 'log_source', 'log_type', 'outputs', 'record', + 'rule_description', 'rule_name', 'source_entity', 'source_service' + } + assert_equal(expected_keys, set(payload.keys())) + + @patch('stream_alert.alert_processor.main.LOGGER') + def test_create_dispatcher_invalid(self, mock_logger): + """Alert Processor - Create Dispatcher - Invalid Output""" + processor = AlertProcessor(_ARN) + assert_is_none(processor._create_dispatcher('helloworld')) + mock_logger.error.called_once_with(ANY, 'helloworld') + + @patch('stream_alert.alert_processor.main.LOGGER') + def test_create_dispatcher_output_doesnt_exist(self, mock_logger): + """Alert Processor - Create Dispatcher - Output Does Not Exist""" + processor = AlertProcessor(_ARN) + assert_is_none(processor._create_dispatcher('slack:no-such-channel')) + mock_logger.error.called_once_with( + 'The output \'%s\' does not exist!', 'slack:no-such-channel') + + def test_create_dispatcher(self): + """Alert Processor - Create Dispatcher - Success""" + processor = AlertProcessor(_ARN) + dispatcher = processor._create_dispatcher('aws-s3:unit_test_bucket') + assert_is_instance(dispatcher, OutputDispatcher) + + @patch('stream_alert.alert_processor.main.LOGGER') + def test_send_alert_exception(self, mock_logger): + """Alert Processor - Send Alert - Exception""" + dispatcher = MagicMock() + dispatcher.dispatch.side_effect = AttributeError + alert = {'id': '123', 'rule_name': 'hello_world', 'record': {'abc': 123}} + + assert_false(AlertProcessor._send_alert(alert, 'slack:unit_test_channel', dispatcher)) + mock_logger.assert_has_calls([ + call.info('Sending alert %s to %s', '123', 'slack:unit_test_channel'), + call.exception('Exception when sending alert %s to %s. Alert:\n%s', + '123', 'slack:unit_test_channel', ANY) + ]) + + @patch('stream_alert.alert_processor.main.LOGGER') + def test_send_alert(self, mock_logger): + """Alert Processor - Send Alert - Success""" + dispatcher = MagicMock() + dispatcher.dispatch.return_value = True + alert = {'id': '123', 'rule_name': 'hello_world', 'record': {'abc': 123}} + + assert_true(AlertProcessor._send_alert(alert, 'slack:unit_test_channel', dispatcher)) + dispatcher.dispatch.assert_called_once_with( + descriptor='unit_test_channel', rule_name='hello_world', alert=alert) + mock_logger.info.assert_called_once_with( + 'Sending alert %s to %s', '123', 'slack:unit_test_channel') + + def test_update_alerts_table_delete(self): + """Alert Processor - Update Alerts Table - Delete Item""" + processor = AlertProcessor(_ARN) + processor.alerts_table.delete_item = MagicMock() + + processor._update_alerts_table('name', 'id', {'out1': True, 'out2': True}) + processor.alerts_table.delete_item.assert_called_once_with( + Key={'RuleName': 'name', 'AlertID': 'id'} + ) + + def test_update_alerts_table_update(self): + """Alert Processor - Update Alerts Table - Update With Failed Outputs""" + processor = AlertProcessor(_ARN) + processor.alerts_table.update_item = MagicMock() + + processor._update_alerts_table('name', 'id', {'out1': True, 'out2': False, 'out3': False}) + processor.alerts_table.update_item.assert_called_once_with( + Key={'RuleName': 'name', 'AlertID': 'id'}, + UpdateExpression='SET RetryOutputs = :failed_outputs', + ExpressionAttributeValues={':failed_outputs': {'out2', 'out3'}} + ) + + @patch.object(AlertProcessor, '_send_alert', return_value=True) + @patch.object(AlertProcessor, '_update_alerts_table') + def test_run(self, mock_send_alert, mock_update_table): + """Alert Processor - Run""" + processor = AlertProcessor(_ARN) + results = processor.run(_EVENT) + + mock_send_alert.assert_called_once() + mock_update_table.assert_called_once() + assert_equal({'slack:unit_test_channel': True}, results) + + @patch.object(AlertProcessor, 'run', return_value={'out1': True}) + def test_handler(self, mock_run): + """Alert Processor - Lambda Handler""" + context = MagicMock() + context.invoked_function_arn = _ARN + + result = handler(_EVENT, context) + mock_run.assert_called_once_with(_EVENT) + assert_equal({'out1': True}, result) diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/__init__.py b/tests/unit/stream_alert_alert_processor/test_outputs/__init__.py index 48564fa87..e69de29bb 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/__init__.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/__init__.py @@ -1,21 +0,0 @@ -''' -Copyright 2017-present, Airbnb Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' -from stream_alert.alert_processor.main import _load_output_config - -REGION = 'us-east-1' -FUNCTION_NAME = 'corp-prefix_prod_streamalert_alert_processor' -CONFIG = _load_output_config('tests/unit/conf/outputs.json') -KMS_ALIAS = 'alias/stream_alert_secrets_test' diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_output_base.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_output_base.py index 3438e0f22..4b682299a 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_output_base.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_output_base.py @@ -85,6 +85,7 @@ def test_user_defined_properties(): # The user defined properties should at a minimum contain a descriptor assert_is_not_none(props.get('descriptor')) + def test_output_loading(): """OutputDispatcher - Loading Output Classes""" loaded_outputs = set(StreamAlertOutput.get_all_outputs()) @@ -122,11 +123,6 @@ def test_local_temp_dir(self): temp_dir = self._dispatcher._local_temp_dir() assert_equal(temp_dir.split('/')[-1], 'stream_alert_secrets') - def test_get_secrets_bucket_name(self): - """OutputDispatcher - Get Secrets Bucket Name""" - bucket_name = self._dispatcher._get_secrets_bucket_name(FUNCTION_NAME) - assert_equal(bucket_name, 'corp-prefix.streamalert.secrets') - def test_output_cred_name(self): """OutputDispatcher - Output Cred Name""" output_name = self._dispatcher.output_cred_name('creds') From 50a226d6f52400735873abf1bfc92b490372207c Mon Sep 17 00:00:00 2001 From: Austin Byers Date: Tue, 20 Mar 2018 16:17:53 -0700 Subject: [PATCH 4/6] Address feedback --- stream_alert/alert_merger/main.py | 22 ++++++++++-------- stream_alert/alert_processor/main.py | 14 +++++++---- .../stream_alert_alert_merger/test_main.py | 23 ++++++++++++++----- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/stream_alert/alert_merger/main.py b/stream_alert/alert_merger/main.py index 0ed96706f..bebf9cd9b 100644 --- a/stream_alert/alert_merger/main.py +++ b/stream_alert/alert_merger/main.py @@ -25,8 +25,6 @@ from boto3.dynamodb.conditions import Attr, Key from botocore.exceptions import ClientError -MERGER = None # Cache the instantiation of the AlertMerger for the life of the Lambda container. - class AlertTable(object): """Provides convenience methods for accessing and modifying the alerts table.""" @@ -98,9 +96,9 @@ def mark_as_dispatched(self, rule_name, alert_id): ConditionExpression='attribute_exists(AlertID)' ) except ClientError as error: - if error.response['Error']['Code'] == 'ConditionalCheckFailedException': - LOGGER.warn('Conditional update failed: %s', error.response['Error']['Message']) - else: + # The update will fail if the alert was already deleted by the alert processor, + # in which case there's nothing to do! Any other error is re-raised. + if error.response['Error']['Code'] != 'ConditionalCheckFailedException': raise @@ -117,6 +115,15 @@ def default(self, obj): # pylint: disable=arguments-differ,method-hidden # TODO: Alert merging will be implemented here class AlertMerger(object): """Dispatch alerts to the alert processor.""" + ALERT_MERGER = None # AlertMerger instance which can be re-used across Lambda invocations + + @classmethod + def get_instance(cls): + """Get an instance of the AlertMerger, using a cached version if possible.""" + if not cls.ALERT_MERGER: + cls.ALERT_MERGER = AlertMerger() + return cls.ALERT_MERGER + def __init__(self): self.alerts_db = AlertTable(os.environ['ALERTS_TABLE']) self.alert_proc = os.environ['ALERT_PROCESSOR'] @@ -145,7 +152,4 @@ def dispatch(self): def handler(event, context): # pylint: disable=unused-argument """Entry point for the alert merger.""" - global MERGER # pylint: disable=global-statement - if not MERGER: - MERGER = AlertMerger() - MERGER.dispatch() + AlertMerger.get_instance().dispatch() diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 09742d68f..84fdef770 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -31,9 +31,17 @@ class AlertProcessor(object): """Orchestrates delivery of alerts to the appropriate dispatchers.""" + ALERT_PROCESSOR = None # AlertProcessor instance which can be re-used across Lambda invocations BACKOFF_MAX_TRIES = 6 OUTPUT_CONFIG_PATH = 'conf/outputs.json' + @classmethod + def get_instance(cls, invoked_function_arn): + """Get an instance of the AlertProcessor, using a cached version if possible.""" + if not cls.ALERT_PROCESSOR: + cls.ALERT_PROCESSOR = AlertProcessor(invoked_function_arn) + return cls.ALERT_PROCESSOR + def __init__(self, invoked_function_arn): """Initialization logic that can be cached across invocations. @@ -192,8 +200,4 @@ def handler(event, context): For example, {'aws-firehose:sample': False, 'slack:example-channel': True}. NOTE: Invalid outputs are excluded from the result (they should not be retried) """ - global ALERT_PROCESSOR # pylint: disable=global-statement - if not ALERT_PROCESSOR: - # Create the alert processor if we haven't already - ALERT_PROCESSOR = AlertProcessor(context.invoked_function_arn) - return ALERT_PROCESSOR.run(event) + return AlertProcessor.get_instance(context.invoked_function_arn).run(event) diff --git a/tests/unit/stream_alert_alert_merger/test_main.py b/tests/unit/stream_alert_alert_merger/test_main.py index 5a0a7d79e..ba05a61a3 100644 --- a/tests/unit/stream_alert_alert_merger/test_main.py +++ b/tests/unit/stream_alert_alert_merger/test_main.py @@ -94,11 +94,22 @@ def test_mark_as_dispatched(self): assert_equal(Decimal('1'), item['Attempts']) assert_is_instance(item['Dispatched'], Decimal) - @patch.object(main, 'LOGGER') - def test_mark_as_dispatched_conditional_fail(self, mock_logger): + def test_mark_as_dispatched_conditional_fail(self): """Alert Merger - Alert Table - Dispatched Alert is Already Deleted""" - self.alert_table.mark_as_dispatched('nonsense', 'bears') - mock_logger.warn.assert_called_once_with('Conditional update failed: %s', ANY) + self.alert_table.table = MagicMock() + + def mock_update(**kwargs): # pylint: disable=unused-argument + raise ClientError({'Error': {'Code': 'ConditionalCheckFailedException'}}, 'UpdateItem') + self.alert_table.table.update_item.side_effect = mock_update + + # No error should be raised if the conditional delete failed + self.alert_table.mark_as_dispatched('rule_name', 'alert_id') + self.alert_table.table.update_item.assert_called_once_with( + Key={'RuleName': 'rule_name', 'AlertID': 'alert_id'}, + UpdateExpression='SET Dispatched = :now ADD Attempts :one', + ExpressionAttributeValues={':now': ANY, ':one': 1}, + ConditionExpression='attribute_exists(AlertID)' + ) def test_mark_as_dispatched_exception(self): """Alert Merger - Alert Table - An Unhandled Exception is Re-Raised""" @@ -158,6 +169,6 @@ def test_handler(mock_instance): """Alert Merger - Handler (Entry Point)""" main.handler(None, None) mock_instance.assert_has_calls([ - call(), - call().dispatch() + call.get_instance(), + call.get_instance().dispatch() ]) From 8cb1373773bc4f44b00ee77c74325380b3e2cb88 Mon Sep 17 00:00:00 2001 From: Austin Byers Date: Tue, 20 Mar 2018 17:31:46 -0700 Subject: [PATCH 5/6] Add AlertAttempts metric to alert merger --- conf/lambda.json | 1 + stream_alert/alert_merger/main.py | 5 +++- stream_alert/shared/metrics.py | 14 +++++++++-- stream_alert_cli/terraform/alert_merger.py | 2 +- stream_alert_cli/terraform/lambda_module.py | 24 +++++++++++++++++-- stream_alert_cli/terraform/metrics.py | 4 ---- terraform/modules/tf_lambda/cloudwatch.tf | 16 ++++++++++++- terraform/modules/tf_lambda/variables.tf | 11 +++++++++ .../terraform/test_alert_processor.py | 2 ++ .../unit/stream_alert_shared/test_metrics.py | 9 ------- 10 files changed, 68 insertions(+), 20 deletions(-) diff --git a/conf/lambda.json b/conf/lambda.json index 33480cbd0..c2f0933ea 100644 --- a/conf/lambda.json +++ b/conf/lambda.json @@ -2,6 +2,7 @@ "alert_merger_config": { "concurrency_limit": 1, "current_version": "$LATEST", + "enable_metrics": true, "handler": "stream_alert.alert_merger.main.handler", "log_level": "info", "log_retention_days": 14, diff --git a/stream_alert/alert_merger/main.py b/stream_alert/alert_merger/main.py index bebf9cd9b..89e3dece3 100644 --- a/stream_alert/alert_merger/main.py +++ b/stream_alert/alert_merger/main.py @@ -20,6 +20,7 @@ import time from stream_alert.alert_merger import LOGGER +from stream_alert.shared.metrics import ALERT_MERGER_NAME, MetricLogger import boto3 from boto3.dynamodb.conditions import Attr, Key @@ -132,8 +133,10 @@ def __init__(self): def _dispatch_alert(self, alert): """Dispatch all alerts which need to be sent to the rule processor.""" + this_attempt_num = alert.get('Attempts', 0) + 1 LOGGER.info('Dispatching alert %s to %s (attempt %d)', - alert['AlertID'], self.alert_proc, alert.get('Attempts', 0) + 1) + alert['AlertID'], self.alert_proc, this_attempt_num) + MetricLogger.log_metric(ALERT_MERGER_NAME, MetricLogger.ALERT_ATTEMPTS, this_attempt_num) self.lambda_client.invoke( FunctionName=self.alert_proc, diff --git a/stream_alert/shared/metrics.py b/stream_alert/shared/metrics.py index 87c7850a3..40dca6995 100644 --- a/stream_alert/shared/metrics.py +++ b/stream_alert/shared/metrics.py @@ -16,6 +16,7 @@ import os from stream_alert.shared import ( + ALERT_MERGER_NAME, ALERT_PROCESSOR_NAME, ATHENA_PARTITION_REFRESH_NAME, LOGGER, @@ -27,7 +28,10 @@ # The FUNC_PREFIXES dict acts as a simple map to a human-readable name # Add ATHENA_PARTITION_REFRESH_NAME: 'AthenaPartitionRefresh', to the # below when metrics are supported there -FUNC_PREFIXES = {RULE_PROCESSOR_NAME: 'RuleProcessor'} +FUNC_PREFIXES = { + ALERT_MERGER_NAME: 'AlertMerger', + RULE_PROCESSOR_NAME: 'RuleProcessor' +} try: ENABLE_METRICS = bool(int(os.environ.get('ENABLE_METRICS', 0))) @@ -47,7 +51,7 @@ class MetricLogger(object): accessing properties and avoids doing dict lookups a ton. """ - # Constant metric names used for CloudWatch + # Rule Processor metric names FAILED_PARSES = 'FailedParses' S3_DOWNLOAD_TIME = 'S3DownloadTime' TOTAL_PROCESSED_SIZE = 'TotalProcessedSize' @@ -60,6 +64,9 @@ class MetricLogger(object): FIREHOSE_FAILED_RECORDS = 'FirehoseFailedRecords' NORMALIZED_RECORDS = 'NormalizedRecords' + # Alert Merger metric names + ALERT_ATTEMPTS = 'AlertAttempts' + _default_filter = '{{ $.metric_name = "{}" }}' _default_value_lookup = '$.metric_value' @@ -70,6 +77,9 @@ class MetricLogger(object): # If additional metric logging is added that does not conform to this default # configuration, new filters & lookups should be created to handle them as well. _available_metrics = { + ALERT_MERGER_NAME: { + ALERT_ATTEMPTS: (_default_filter.format(ALERT_ATTEMPTS), _default_value_lookup) + }, ALERT_PROCESSOR_NAME: {}, # Placeholder for future alert processor metrics ATHENA_PARTITION_REFRESH_NAME: {}, # Placeholder for future athena processor metrics RULE_PROCESSOR_NAME: { diff --git a/stream_alert_cli/terraform/alert_merger.py b/stream_alert_cli/terraform/alert_merger.py index 854a6e50c..dc527d50c 100644 --- a/stream_alert_cli/terraform/alert_merger.py +++ b/stream_alert_cli/terraform/alert_merger.py @@ -40,7 +40,7 @@ def generate_alert_merger(config): ALERT_MERGER_NAME, config, { 'ALERTS_TABLE': '{}_streamalert_alerts'.format(prefix), 'ALERT_PROCESSOR': '{}_streamalert_alert_processor'.format(prefix), - 'ALERT_PROCESSOR_TIMEOUT_SEC': config['lambda']['alert_processor_config']['timeout'] + 'ALERT_PROCESSOR_TIMEOUT_SEC': config['lambda']['alert_processor_config']['timeout'], } ) diff --git a/stream_alert_cli/terraform/lambda_module.py b/stream_alert_cli/terraform/lambda_module.py index 44b297bb8..2252f2ae2 100644 --- a/stream_alert_cli/terraform/lambda_module.py +++ b/stream_alert_cli/terraform/lambda_module.py @@ -13,6 +13,7 @@ limitations under the License. """ from stream_alert import shared +from stream_alert.shared import metrics from stream_alert_cli.terraform.common import monitoring_topic_arn @@ -48,6 +49,22 @@ def _tf_metric_alarms(lambda_config, sns_arn): return result +def _tf_metric_filters(lambda_config, function_name): + """Compute metric filter Terraform configuration from the Lambda config.""" + if not lambda_config.get('enable_metrics'): + return {} + + # Create a metric filter for each custom metric associated with this function. + metric_filters = [] + function_metrics = metrics.MetricLogger.get_available_metrics()[function_name] + for metric, settings in function_metrics.items(): + metric_name = '{}-{}'.format(metrics.FUNC_PREFIXES[function_name], metric) + filter_pattern, filter_value = settings + metric_filters.append('{},{},{}'.format(metric_name, filter_pattern, filter_value)) + + return {'log_metric_filters': metric_filters} + + def _tf_vpc_config(lambda_config): """Compute VPC configuration from the Lambda config.""" result = {} @@ -70,7 +87,7 @@ def generate_lambda(function_name, config, environment=None): function_name (str): Name of the Lambda function (e.g. 'alert_processor') config (dict): Parsed config from conf/ environment (dict): Optional environment variables to specify. - LOGGER_LEVEL is included automatically. + ENABLE_METRICS and LOGGER_LEVEL are included automatically. Example Lambda config: { @@ -115,6 +132,8 @@ def generate_lambda(function_name, config, environment=None): # Add logger level to any custom environment variables environment_variables = { + # Convert True/False to "1" or "0", respectively + 'ENABLE_METRICS': str(int(lambda_config.get('enable_metrics', False))), 'LOGGER_LEVEL': lambda_config.get('log_level', 'info') } if environment: @@ -139,8 +158,9 @@ def generate_lambda(function_name, config, environment=None): if key in lambda_config: lambda_module[key] = lambda_config[key] - # Add metric alarms to the Lambda module definition + # Add metric alarms and filters to the Lambda module definition lambda_module.update(_tf_metric_alarms(lambda_config, monitoring_topic_arn(config))) + lambda_module.update(_tf_metric_filters(lambda_config, function_name)) # Add VPC config to the Lambda module definition lambda_module.update(_tf_vpc_config(lambda_config)) diff --git a/stream_alert_cli/terraform/metrics.py b/stream_alert_cli/terraform/metrics.py index e63c080d2..256729836 100644 --- a/stream_alert_cli/terraform/metrics.py +++ b/stream_alert_cli/terraform/metrics.py @@ -36,10 +36,6 @@ def generate_cloudwatch_metric_filters(cluster_name, cluster_dict, config): continue if func not in stream_alert_config: - LOGGER_CLI.error( - 'Function for metrics \'%s\' is not defined in stream alert config. ' - 'Options are: %s', func, ', '.join( - '\'{}\''.format(key) for key in stream_alert_config)) continue if not stream_alert_config[func].get('enable_metrics'): diff --git a/terraform/modules/tf_lambda/cloudwatch.tf b/terraform/modules/tf_lambda/cloudwatch.tf index 7dfe46524..54f32bd03 100644 --- a/terraform/modules/tf_lambda/cloudwatch.tf +++ b/terraform/modules/tf_lambda/cloudwatch.tf @@ -21,7 +21,7 @@ resource "aws_cloudwatch_event_target" "invoke_lambda_no_vpc" { arn = "${aws_lambda_alias.alias_no_vpc.arn}" } -// CloudWatch log group with configurable retention and tagging +// CloudWatch log group with configurable retention, tagging, and metric filters resource "aws_cloudwatch_log_group" "lambda_log_group" { count = "${var.enabled}" @@ -33,6 +33,20 @@ resource "aws_cloudwatch_log_group" "lambda_log_group" { } } +// The split list is made up of: , , +resource "aws_cloudwatch_log_metric_filter" "rule_processor_cw_metric_filters" { + count = "${length(var.log_metric_filters)}" + name = "${element(split(",", var.log_metric_filters[count.index]), 0)}" + pattern = "${element(split(",", var.log_metric_filters[count.index]), 1)}" + log_group_name = "${aws_cloudwatch_log_group.lambda_log_group.name}" + + metric_transformation { + name = "${element(split(",", var.log_metric_filters[count.index]), 0)}" + namespace = "${var.log_metric_filter_namespace}" + value = "${element(split(",", var.log_metric_filters[count.index]), 2)}" + } +} + // Generic CloudWatch metric alarms related to this function resource "aws_cloudwatch_metric_alarm" "lambda_invocation_errors" { diff --git a/terraform/modules/tf_lambda/variables.tf b/terraform/modules/tf_lambda/variables.tf index 1c128b30f..cdf8b5c1f 100644 --- a/terraform/modules/tf_lambda/variables.tf +++ b/terraform/modules/tf_lambda/variables.tf @@ -97,6 +97,17 @@ variable "log_retention_days" { description = "CloudWatch logs for the Lambda function will be retained for this many days" } +variable "log_metric_filter_namespace" { + default = "StreamAlert" + description = "Namespace for metrics generated from metric filters" +} + +variable "log_metric_filters" { + type = "list" + default = [] + description = "Metric filters applied to the log group. Each filter should be in the format \"filter_name,filter_pattern,value\"" +} + // ***** CloudWatch metric alarms ***** variable "alarm_actions" { diff --git a/tests/unit/stream_alert_cli/terraform/test_alert_processor.py b/tests/unit/stream_alert_cli/terraform/test_alert_processor.py index 5ac741dc5..9fcfb3006 100644 --- a/tests/unit/stream_alert_cli/terraform/test_alert_processor.py +++ b/tests/unit/stream_alert_cli/terraform/test_alert_processor.py @@ -55,6 +55,7 @@ def test_generate_all_options(self): 'description': 'StreamAlert Alert Processor', 'environment_variables': { 'ALERTS_TABLE': 'unit-testing_streamalert_alerts', + 'ENABLE_METRICS': '0', 'LOGGER_LEVEL': 'info' }, 'errors_alarm_enabled': True, @@ -110,6 +111,7 @@ def test_generate_minimal_options(self): 'description': 'StreamAlert Alert Processor', 'environment_variables': { 'ALERTS_TABLE': 'unit-testing_streamalert_alerts', + 'ENABLE_METRICS': '0', 'LOGGER_LEVEL': 'info' }, 'function_name': 'unit-testing_streamalert_alert_processor', diff --git a/tests/unit/stream_alert_shared/test_metrics.py b/tests/unit/stream_alert_shared/test_metrics.py index 250a904f7..0004499f2 100644 --- a/tests/unit/stream_alert_shared/test_metrics.py +++ b/tests/unit/stream_alert_shared/test_metrics.py @@ -31,15 +31,6 @@ def setup(self): # Force reload the metrics package to trigger env var loading reload(shared.metrics) - @patch('logging.Logger.error') - def test_invalid_metric_function(self, log_mock): - """Metrics - Invalid Function Name""" - shared.metrics.MetricLogger.log_metric('rule_procesor', '', '') - - log_mock.assert_called_with( - 'Function \'%s\' not defined in available metrics. ' - 'Options are: %s', 'rule_procesor', '\'rule_processor\'') - @patch('logging.Logger.error') def test_invalid_metric_name(self, log_mock): """Metrics - Invalid Metric Name""" From a53994f627259c5bf1e9f70c9b64bdc543319072 Mon Sep 17 00:00:00 2001 From: Austin Byers Date: Wed, 21 Mar 2018 11:32:06 -0700 Subject: [PATCH 6/6] Restore missing newlines --- stream_alert/alert_processor/main.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/stream_alert/alert_processor/main.py b/stream_alert/alert_processor/main.py index 388082cd0..b3ed31b4f 100644 --- a/stream_alert/alert_processor/main.py +++ b/stream_alert/alert_processor/main.py @@ -1,9 +1,12 @@ """ Copyright 2017-present, Airbnb Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,6 +44,7 @@ def get_instance(cls, invoked_function_arn): def __init__(self, invoked_function_arn): """Initialization logic that can be cached across invocations. + Args: invoked_function_arn (str): The ARN of the alert processor when it was invoked. This is used to calculate region, account, and prefix. @@ -61,8 +65,10 @@ def __init__(self, invoked_function_arn): @staticmethod def _build_alert_payload(record): """Transform a raw Dynamo record into a payload ready for dispatching. + Args: record (dict): A row in the Dynamo alerts table + Returns: OrderedDict: An alert payload ready to be sent to output dispatchers. """ @@ -84,8 +90,10 @@ def _build_alert_payload(record): def _create_dispatcher(self, output): """Create a dispatcher for the given output. + Args: output (str): Alert output, e.g. "aws-sns:topic-name" + Returns: OutputDispatcher: Based on the output type. Returns None if the output is invalid or not defined in the config. @@ -108,6 +116,7 @@ def _create_dispatcher(self, output): @staticmethod def _send_alert(alert_payload, output, dispatcher): """Send a single alert to the given output. + Returns: bool: True if the alert was sent successfully. """ @@ -145,8 +154,10 @@ def _update_alerts_table(self, rule_name, alert_id, results): def run(self, event): """Run the alert processor! + Args: event (dict): Invocation event (record from Dynamo table) + Returns: dict(str, bool): Maps each output to whether it was sent successfully. Invalid outputs are excluded from the result. @@ -166,6 +177,7 @@ def run(self, event): def handler(event, context): """StreamAlert Alert Processor - entry point + Args: event (dict): Record from the alerts Dynamo table: { 'AlertID': str, # UUID @@ -184,6 +196,7 @@ def handler(event, context): 'SourceService': str, # Service which generated the alert (e.g. "sns", "slack") } context (AWSLambdaContext): Lambda invocation context + Returns: dict(str, bool): Maps each output to whether it was sent successfully. For example, {'aws-firehose:sample': False, 'slack:example-channel': True}.