Skip to content

Commit

Permalink
[lambda][rule] processed logs will now be sent to the alert processor…
Browse files Browse the repository at this point in the history
… as the rule processor parses them
  • Loading branch information
ryandeivert committed May 5, 2017
1 parent 7cedf2a commit 742599c
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 78 deletions.
3 changes: 2 additions & 1 deletion stream_alert/rule_processor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def load_env(context):
Returns:
{'lambda_function_name': 'function_name',
'lambda_alias': 'staging|development|production'}
'lambda_alias': 'development|production'}
"""
env = {}
if context:
Expand All @@ -104,4 +104,5 @@ def load_env(context):
env['lambda_alias'] = arn[7]
else:
env['lambda_alias'] = 'development'
env['lambda_region'] = 'us-east-1'
return env
90 changes: 53 additions & 37 deletions stream_alert/rule_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,29 @@
from stream_alert.rule_processor.sink import StreamSink

logging.basicConfig()
level = os.environ.get('LOGGER_LEVEL', 'INFO')
LEVEL = os.environ.get('LOGGER_LEVEL', 'INFO')
LOGGER = logging.getLogger('StreamAlert')
LOGGER.setLevel(level.upper())
LOGGER.setLevel(LEVEL.upper())


class StreamAlert(object):
"""Wrapper class for handling all StreamAlert classificaiton and processing"""
def __init__(self, **kwargs):
def __init__(self, context, return_alerts=False):
"""
Args:
context: An AWS context object which provides metadata on the currently
executing lambda function.
return_alerts: If the user wants to handle the sinking
of alerts to external endpoints, return a list of
generated alerts.
"""
self.return_alerts = kwargs.get('return_alerts')
self.return_alerts = return_alerts
self.env = load_env(context)
# Instantiate the sink here to handle sending the triggered alerts to the alert processor
self.sinker = StreamSink(self.env)
self.alerts = []

def run(self, event, context):
def run(self, event):
"""StreamAlert Lambda function handler.
Loads the configuration for the StreamAlert function which contains:
Expand All @@ -37,22 +42,20 @@ def run(self, event, context):
Args:
event: An AWS event mapped to a specific source/entity (kinesis stream or
an s3 bucket event) containing data emitted to the stream.
context: An AWS context object which provides metadata on the currently
executing lambda function.
Returns:
None
"""
LOGGER.debug('Number of Records: %d', len(event.get('Records', [])))

config = load_config()
env = load_env(context)

for record in event.get('Records', []):
payload = StreamPayload(raw_record=record)
classifier = StreamClassifier(config=config)

# If the kinesis stream or s3 bucket is not in our config, go onto the next record
# If the kinesis stream, s3 bucket, or sns topic is not in our config,
# go onto the next record
if not classifier.map_source(payload):
continue

Expand All @@ -65,48 +68,61 @@ def run(self, event, context):
else:
LOGGER.info('Unsupported service: %s', payload.service)

# returns the list of generated alerts
if self.return_alerts:
return self.alerts
# send alerts to SNS
self._send_alerts(env, payload)

if self.env['lambda_alias'] == 'development':
LOGGER.info('%s alerts triggered', len(self.alerts))
LOGGER.info('\n%s\n', json.dumps(self.alerts, indent=4))

def _kinesis_process(self, payload, classifier):
"""Process Kinesis data for alerts"""
data = StreamPreParsers.pre_parse_kinesis(payload.raw_record)
self.process_alerts(classifier, payload, data)
self._process_alerts(classifier, payload, data)

def _s3_process(self, payload, classifier):
"""Process S3 data for alerts"""
s3_file = StreamPreParsers.pre_parse_s3(payload.raw_record)
s3_file, s3_object_size = StreamPreParsers.pre_parse_s3(payload.raw_record)
count, processed_size = 0, 0
for data in StreamPreParsers.read_s3_file(s3_file):
payload.refresh_record(data)
self.process_alerts(classifier, payload, data)
self._process_alerts(classifier, payload, data)
# Add the current data to the total processed size, +1 to account for line feed
processed_size += (len(data) + 1)
count += 1
# Log an info message on every 100 lines processed
if count % 100 == 0:
avg_record_size = (processed_size - 1 / count)
approx_record_count = s3_object_size / avg_record_size
LOGGER.info('Processed %s records out of an approximate total of %s '
'(average record size: %s bytes, total size: %s bytes)',
count, approx_record_count, avg_record_size, s3_object_size)

def _sns_process(self, payload, classifier):
"""Process SNS data for alerts"""
data = StreamPreParsers.pre_parse_sns(payload.raw_record)
self.process_alerts(classifier, payload, data)

def _send_alerts(self, env, payload):
"""Send generated alerts to correct places"""
if self.alerts:
if env['lambda_alias'] == 'development':
LOGGER.info('%s alerts triggered', len(self.alerts))
LOGGER.info('\n%s\n', json.dumps(self.alerts, indent=4))
else:
StreamSink(self.alerts, env).sink()
elif payload.valid:
LOGGER.debug('Valid data, no alerts')
self._process_alerts(classifier, payload, data)

def _process_alerts(self, classifier, payload, data):
"""Process records for alerts and send them to the correct places
def process_alerts(self, classifier, payload, data):
"""Process records for alerts"""
Args:
classifier [StreamClassifier]: Handler for classifying a record's data
payload [StreamPayload]: StreamAlert payload object being processed
data [string]: Pre parsed data string from a raw_event to be parsed
"""
classifier.classify_record(payload, data)
if payload.valid:
alerts = StreamRules.process(payload)
if alerts:
self.alerts.extend(alerts)
else:
LOGGER.error('Invalid data: %s\n%s',
payload,
json.dumps(payload.raw_record, indent=4))
if not payload.valid:
LOGGER.error('Invalid data: %s\n%s', payload, json.dumps(payload.raw_record, indent=4))
return

alerts = StreamRules.process(payload)
if not alerts:
LOGGER.debug('Valid data, no alerts')
return

if self.return_alerts:
self.alerts.extend(alerts)
return

self.sinker.sink(alerts)
2 changes: 1 addition & 1 deletion stream_alert/rule_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@

def handler(event, context):
"""Main Lambda handler function"""
StreamAlert().run(event, context)
StreamAlert(context).run(event)
16 changes: 8 additions & 8 deletions stream_alert/rule_processor/pre_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import boto3

logging.basicConfig()
logger = logging.getLogger('StreamAlert')
LOGGER = logging.getLogger('StreamAlert')

class S3ObjectSizeError(Exception):
pass
Expand Down Expand Up @@ -64,7 +64,7 @@ def pre_parse_s3(cls, raw_record):
size = int(raw_record['s3']['object']['size'])
downloaded_s3_object = cls._download_s3_object(client, bucket, key, size)

return downloaded_s3_object
return downloaded_s3_object, size

@classmethod
def pre_parse_sns(cls, raw_record):
Expand Down Expand Up @@ -106,7 +106,7 @@ def read_s3_file(cls, downloaded_s3_object):
# remove the file
os.remove(downloaded_s3_object)
if not os.path.exists(downloaded_s3_object):
logger.debug('Removed temp file - %s', downloaded_s3_object)
LOGGER.debug('Removed temp file - %s', downloaded_s3_object)

@classmethod
def _download_s3_object(cls, client, bucket, key, size):
Expand All @@ -131,15 +131,15 @@ def _download_s3_object(cls, client, bucket, key, size):
if size_mb > 128:
raise S3ObjectSizeError('S3 object to download is above 128MB')

logger.debug('/tmp directory contents:%s ', os.listdir('/tmp'))
logger.debug(os.popen('df -h /tmp | tail -1').read().strip())
LOGGER.debug('/tmp directory contents:%s ', os.listdir('/tmp'))
LOGGER.debug(os.popen('df -h /tmp | tail -1').read().strip())

if size_mb:
display_size = '{}MB'.format(size_mb)
else:
display_size = '{}KB'.format(size_kb)
logger.debug('Starting download from S3 - %s/%s [%s]',
bucket, key, display_size)

LOGGER.debug('Starting download from S3 - %s/%s [%s]', bucket, key, display_size)

suffix = key.replace('/', '-')
_, downloaded_s3_object = tempfile.mkstemp(suffix=suffix)
Expand All @@ -148,6 +148,6 @@ def _download_s3_object(cls, client, bucket, key, size):
client.download_fileobj(bucket, key, data)

end_time = time.time() - start_time
logger.debug('Completed download in %s seconds', round(end_time, 2))
LOGGER.debug('Completed download in %s seconds', round(end_time, 2))

return downloaded_s3_object
51 changes: 25 additions & 26 deletions stream_alert/rule_processor/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
_SNS_MAX_SIZE = (256*1024)

logging.basicConfig()
logger = logging.getLogger('StreamAlert')
LOGGER = logging.getLogger('StreamAlert')

def json_dump(sns_dict, indent_value=None):
def json_dict_serializer(obj):
Expand All @@ -41,16 +41,15 @@ class SNSMessageSizeError(Exception):
pass

class StreamSink(object):
def __init__(self, alerts, env):
self.alerts = alerts
def __init__(self, env):
self.env = env
self.BOTO_CLIENT_SNS = boto3.client('sns', region_name=self.env['lambda_region'])

def sink(self):
def sink(self, alerts):
"""Sink triggered alerts from the StreamRules engine.
Group alerts to be sent to each sink, verifies that the
sink exists in our configuration, and then sinks each
group of alerts to the given SNS topic.
Args:
alerts [list]: a list of dictionaries representating json alerts
Sends a message to SNS with the following JSON format:
{default: [
Expand All @@ -71,15 +70,13 @@ def sink(self):
]}
"""
lambda_alias = self.env['lambda_alias']

for alert in self.alerts:
for alert in alerts:
sns_dict = {'default': alert}
if lambda_alias == 'production':
topic_arn = self._get_sns_topic_arn()
client = boto3.client('sns', region_name=self.env['lambda_region'])
self.publish_message(client, json_dump(sns_dict), topic_arn)
elif lambda_alias == 'staging':
logger.info(json_dump(sns_dict, 2))
self.publish_message(self.BOTO_CLIENT_SNS, json_dump(sns_dict), topic_arn)
else:
LOGGER.error('Unsupported lambda alias: %s', lambda_alias)

def _get_sns_topic_arn(self):
"""Return a properly formatted SNS ARN.
Expand Down Expand Up @@ -117,18 +114,20 @@ def publish_message(self, client, message, topic):
message: A JSON string containing a serialized alert.
topic: The SNS topic ARN to send to.
"""
if self._sns_message_size_check(message):
try:
response = client.publish(
TopicArn=topic,
Message=message,
Subject='StreamAlert Rules Triggered'
)
except botocore.exceptions.ClientError as err:
logging.error('An error occurred while publishing alert: %s', err.response)
raise err
logger.info('Published %i alert(s) to %s', len(self.alerts), topic)
logger.info('SNS MessageID: %s', response['MessageId'])
else:
if not self._sns_message_size_check(message):
logging.error('Cannot publish Alerts, message size is too big!')
raise SNSMessageSizeError('SNS message size is too big! (Max: 256KB)')

try:
response = client.publish(
TopicArn=topic,
Message=message,
Subject='StreamAlert Rules Triggered'
)
except botocore.exceptions.ClientError as err:
logging.error('An error occurred while publishing alert: %s', err.response)
raise err

LOGGER.info('Published alert to %s', topic)
LOGGER.info('SNS MessageID: %s', response['MessageId'])

3 changes: 2 additions & 1 deletion stream_alert_cli/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_rule(rule_name, test_record, formatted_record):
else:
expected_alert_count = (0, 1)[test_record['trigger']]

alerts = StreamAlert(return_alerts=True).run(event, None)
alerts = StreamAlert(None, True).run(event)
# we only want alerts for the specific rule passed in
matched_alert_count = len([x for x in alerts if x['metadata']['rule_name'] == rule_name])

Expand Down Expand Up @@ -166,6 +166,7 @@ def format_record(test_record):
# Set the S3 object key to a random value for testing
test_record['key'] = ('{:032X}'.format(random.randrange(16**32)))
template['s3']['object']['key'] = test_record['key']
template['s3']['object']['size'] = len(data)
template['s3']['bucket']['arn'] = 'arn:aws:s3:::{}'.format(source)
template['s3']['bucket']['name'] = source

Expand Down
5 changes: 3 additions & 2 deletions test/unit/test_pre_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_pre_parse_s3():
},
'object': {
'key': key_name,
'size': 1000
'size': 30
}
}
}
Expand All @@ -63,8 +63,9 @@ def test_pre_parse_s3():
obj = s3_resource.Object(bucket_name, key_name)
obj.put(Body=body_value)

s3_file = StreamPreParsers.pre_parse_s3(raw_record)
s3_file, size = StreamPreParsers.pre_parse_s3(raw_record)
data = StreamPreParsers.read_s3_file(s3_file).next()
assert_equal(body_value, data)
assert_equal(size, 30)

BOTO_MOCKER.stop()
4 changes: 2 additions & 2 deletions test/unit/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def setup(self):

def test_sns_topic_arn(self):
"""Sink SNS Messaging - Topic ARN"""
sinker = sink.StreamSink([], self.env)
sinker = sink.StreamSink(self.env)
arn = sinker._get_sns_topic_arn()
assert_equal(arn, 'arn:aws:sns:us-east-1:123456789012:unittest_prod_streamalerts')

def test_message_size_check(self):
"""Sink SNS Messaging - Message Blob Size Check"""
sinker = sink.StreamSink([], self.env)
sinker = sink.StreamSink(self.env)
passed = sinker._sns_message_size_check(get_payload(1000))
assert_equal(passed, True)
passed = sinker._sns_message_size_check(get_payload((256*1024)+1))
Expand Down

0 comments on commit 742599c

Please sign in to comment.