Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] drop sns publish for lambda invoke #225

Merged
merged 14 commits into from
Jul 18, 2017
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/global.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,10 @@
"tfstate_bucket": "PREFIX_GOES_HERE.streamalert.terraform.state",
"tfstate_s3_key": "stream_alert_state/terraform.tfstate",
"tfvars": "terraform.tfvars"
},
"infrastructure": {
"monitoring": {
"create_sns_topic": true
}
}
}
38 changes: 38 additions & 0 deletions docs/source/clusters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,44 @@ To disable CloudWatch alarms, set to ``false``.
}
}

To configure the SNS topic used to receive CloudWatch metric alarms, use one of the following options in the ``conf/global.json`` configuration file.

Option 1: Create a new topic. This tells the StreamAlert CLI to create a new topic called ``stream_alert_monitoring``. All clusters will send alarms to this topic.

.. code-block:: json

{
"account": {
"...": "..."
},
"terraform": {
"...": "..."
},
"infrastructure": {
"monitoring": {
"create_sns_topic": true
}
}
}

Option 2: Use an existing SNS topic within your AWS account (created outside of the scope of StreamAlert).

.. code-block:: json

{
"account": {
"...": "..."
},
"terraform": {
"...": "..."
},
"infrastructure": {
"monitoring": {
"sns_topic_name": "my_sns_topic"
}
}
}

Module: Kinesis Events
----------------------

Expand Down
104 changes: 104 additions & 0 deletions stream_alert/alert_processor/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'''
Copyright 2017-present, Airbnb Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging

logging.basicConfig()
LOGGER = logging.getLogger('StreamAlertOutput')
LOGGER.setLevel(logging.DEBUG)

def validate_alert(alert):
"""Helper function to perform simple validatation of an alert's keys and structure

Args:
alert [dict]: the alert record to test that should be in the form of a dict

Returns:
[bool] a boolean value indicating whether or not the alert has the proper structure
"""
if not _validate_root(alert):
return False

metadata_keys = {'log', 'rule_name', 'rule_description', 'type', 'source', 'outputs'}
if not set(alert['metadata'].keys()) == metadata_keys:
LOGGER.error('The value of the \'metadata\' key must be a map (dict) '
'that contains the following keys: %s',
', '.join('\'{}\''.format(key) for key in metadata_keys))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these comparisons in this file test for set equality (the keys must match the hard-coded set), but the errors messages say it must contain such and such keys.

Just to mitigate potential confusion, can we either update the error messages to indicate it must contain exactly the required keys, or else update the tests to check for a set inclusion instead of equality?

Copy link
Contributor Author

@ryandeivert ryandeivert Jul 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@austinbyers making the logger messages indication that only those keys should be included. would rather have a strict validation of input instead of a loose validation.

return False

valid = True
for key in metadata_keys:
if key == 'source':
if not (isinstance(alert['metadata'][key], dict) and
set(alert['metadata'][key].keys()) == {'service', 'entity'}):
LOGGER.error('The value of the \'source\' key must be a map (dict) that '
'contains \'service\' and \'entity\' keys.')
valid = False
continue

for entry in alert['metadata'][key].values():
if not isinstance(entry, (str, unicode)):
LOGGER.error('The value of the \'%s\' key within \'%s\' must be '
'a string (str).', entry, key)
valid = False
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant (at the end of the inner for loop)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! removed.


elif key == 'outputs':
if not (isinstance(alert['metadata'][key], list) and
alert['metadata'][key]):
LOGGER.error(
'The value of the \'outputs\' key must be an array (list) that '
'contains at least one configured output.')
valid = False
continue

for entry in alert['metadata'][key]:
if not isinstance(entry, (str, unicode)):
LOGGER.error('The value of each entry in the \'outputs\' list '
'must be a string (str).')
valid = False
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! removed.


elif not isinstance(alert['metadata'][key], (str, unicode)):
LOGGER.error('The value of the \'%s\' key must be a string (str), not %s',
key, type(alert['metadata'][key]))
valid = False
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically redundant, but if you want to keep this one to be consistent with all the other checks, that's fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! removed for brevity.


return valid

def _validate_root(alert):
"""Private helper function to validate the root keys on an alert

Args:
alert [dict]: the alert record to test that should be in the form of a dict

Returns:
[bool] a boolean value indicating whether or not the expected root keys in
the alert exist and have the proper values
"""
if not (isinstance(alert, dict) and
set(alert.keys()) == {'record', 'metadata'}):
LOGGER.error('The alert must be a map (dict) that contains \'record\' '
'and \'metadata\' keys.')
return False

if not (isinstance(alert['record'], dict) and
isinstance(alert['metadata'], dict)):
LOGGER.error('The value of both the \'record\' and \'metadata\' keys '
'must be a map (dict).')
return False

return True
82 changes: 30 additions & 52 deletions stream_alert/alert_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

from collections import OrderedDict

from stream_alert.alert_processor.helpers import validate_alert
from stream_alert.alert_processor.outputs import get_output_dispatcher

logging.basicConfig()
LOGGER = logging.getLogger('StreamAlertOutput')
LOGGER.setLevel(logging.DEBUG)


def handler(event, context):
"""StreamAlert Alert Processor

Expand All @@ -40,68 +42,39 @@ def handler(event, context):
indicates if sending was successful and the second value is the
output configuration info (ie - 'slack:sample_channel')
"""
records = event.get('Records', [])
LOGGER.info('Running alert processor for %d records', len(records))

# A failure to load the config will log the error in load_output_config and return here
# A failure to load the config will log the error in load_output_config
# and return here
config = _load_output_config()
if not config:
return

region = context.invoked_function_arn.split(':')[3]
function_name = context.function_name

status_values = []

for record in records:
sns_payload = record.get('Sns')
if not sns_payload:
continue

sns_message = sns_payload['Message']
try:
loaded_sns_message = json.loads(sns_message)
except ValueError as err:
LOGGER.error('An error occurred while decoding message to JSON: %s', err)
continue
# Return the current list of statuses back to the caller
return [status for status in run(event, region, function_name, config)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would return list(run(event, region, function_name, config)) do the trick?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same effect yes. I'll update with explicit list casting.


if not 'default' in loaded_sns_message:
# do not log for messages related to alarms
if not 'AlarmName' in loaded_sns_message:
LOGGER.error('Malformed SNS: %s', loaded_sns_message)
continue

status_values.extend(run(loaded_sns_message, region, function_name, config))

# Return the current status back to the caller
return status_values

def run(loaded_sns_message, region, function_name, config):
def run(alert, region, function_name, config):
"""Send an Alert to its described outputs.

Args:
loaded_sns_message [dict]: SNS message dictionary with the following structure:

{
'default': alert
}

The alert is another dict with the following structure:

{
'record': record,
'metadata': {
'rule_name': rule.rule_name,
'rule_description': rule.rule_function.__doc__,
'log': str(payload.log_source),
'outputs': rule.outputs,
'type': payload.type,
'source': {
'service': payload.service,
'entity': payload.entity
alert [dict]: dictionary representating an alert with the
following structure:

{
'record': record,
'metadata': {
'rule_name': rule.rule_name,
'rule_description': rule.rule_function.__doc__,
'log': str(payload.log_source),
'outputs': rule.outputs,
'type': payload.type,
'source': {
'service': payload.service,
'entity': payload.entity
}
}
}
}

region [string]: the AWS region being used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update this to: The AWS region of the currently executing Lambda function

function_name [string]: the name of the lambda function
Expand All @@ -110,8 +83,11 @@ def run(loaded_sns_message, region, function_name, config):
Returns:
[generator] yields back dispatch status and name of the output to the handler
"""
LOGGER.debug(loaded_sns_message)
alert = loaded_sns_message['default']
if not validate_alert(alert):
LOGGER.error('Invalid alert:\n%s', json.dumps(alert, indent=2))
return

LOGGER.debug('Sending alert to outputs:\n%s', json.dumps(alert, indent=2))
rule_name = alert['metadata']['rule_name']

# strip out unnecessary keys and sort
Expand Down Expand Up @@ -154,6 +130,7 @@ def run(loaded_sns_message, region, function_name, config):
# Yield back the result to the handler
yield sent, output


def _sort_dict(unordered_dict):
"""Recursively sort a dictionary

Expand All @@ -173,6 +150,7 @@ def _sort_dict(unordered_dict):

return result


def _load_output_config(config_path='conf/outputs.json'):
"""Load the outputs configuration file from disk

Expand All @@ -183,7 +161,7 @@ def _load_output_config(config_path='conf/outputs.json'):
try:
config = json.load(outputs)
except ValueError:
LOGGER.error('The conf/outputs.json file could not be loaded into json')
LOGGER.error('The \'%s\' file could not be loaded into json', config_path)
return

return config
Loading