From ea0f0bf8fca3ea9f0a0d95d697e9189a483d19f9 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Wed, 8 Feb 2017 17:48:23 -0800 Subject: [PATCH 1/9] [core] move pre-parsers into their own file and rename class * this change reduces complexity in classifier * uses a consistent naming scheme for pre-parser classes/methods * updates unit tests to reflect changes --- main.py | 7 +- stream_alert/__init__.py | 2 +- stream_alert/classifier.py | 116 ------------------------- stream_alert/pre_parsers.py | 149 +++++++++++++++++++++++++++++++++ test/unit/test_classifier.py | 5 +- test/unit/test_rules_engine.py | 5 +- 6 files changed, 160 insertions(+), 124 deletions(-) create mode 100644 stream_alert/pre_parsers.py diff --git a/main.py b/main.py index 48a655d8c..c45b569b8 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,8 @@ import os from stream_alert.config import load_config, load_env -from stream_alert.classifier import StreamPayload, StreamPayloadHelpers +from stream_alert.classifier import StreamPayload +from stream_alert.pre_parsers import StreamPreParsers from stream_alert.rules_engine import StreamRules from stream_alert.sink import StreamSink @@ -58,7 +59,7 @@ def handler(event, context): payload.map_source() if payload.service == 's3': - s3_file_lines = StreamPayloadHelpers.parse_s3_object(payload.raw_record) + s3_file_lines = StreamPreParsers.pre_parse_s3(payload.raw_record) for line in s3_file_lines: data = line.rstrip() payload.refresh_record(data) @@ -66,7 +67,7 @@ def handler(event, context): process_alerts(payload, alerts_to_send) elif payload.service == 'kinesis': - data = StreamPayloadHelpers.pre_parse_kinesis(payload.raw_record) + data = StreamPreParsers.pre_parse_kinesis(payload.raw_record) payload.classify_record(data) process_alerts(payload, alerts_to_send) diff --git a/stream_alert/__init__.py b/stream_alert/__init__.py index 75977e6f4..6df8ed024 100644 --- a/stream_alert/__init__.py +++ b/stream_alert/__init__.py @@ -1 +1 @@ -__version__ = '1.0.0' \ No newline at end of file +__version__ = '1.1.0' \ No newline at end of file diff --git a/stream_alert/classifier.py b/stream_alert/classifier.py index be62fa345..6807d269f 100644 --- a/stream_alert/classifier.py +++ b/stream_alert/classifier.py @@ -34,9 +34,6 @@ logging.basicConfig() logger = logging.getLogger('StreamAlert') -class S3ObjectSizeError(Exception): - pass - class InvalidSchemaError(Exception): pass @@ -447,116 +444,3 @@ def parse_syslog(self, data, args): return syslog_payload else: return False - -class StreamPayloadHelpers(object): - """Helper functions to parse incoming data into a string for classificaiton""" - @classmethod - def pre_parse_kinesis(cls, raw_record): - """Decode a Kinesis record. - - Args: - raw_record (dict): A Kinesis event record. - - Returns: (string) Base64 decoded data. - """ - return base64.b64decode(raw_record['kinesis']['data']) - - @classmethod - def _download_s3_object(cls, client, bucket, key, size): - """Download an object from S3. - - Verifies the S3 object is less than or equal to 128MB, and - stores into a temp file. Lambda can only execute for a - maximum of 300 seconds, and the file to download - greatly impacts that time. - - Args: - client: boto3 s3 client object - bucket (string): s3 bucket to download object from - key (string): key of s3 object - size (int): size of s3 object in bytes - - Returns: - (string) The downloaded path of the S3 object. - """ - size_kb = size / 1024 - size_mb = size_kb / 1024 - if size_mb > 128: - raise S3ObjectSizeError('S3 object to download is above 500MB') - - logger.debug('/tmp directory contents:%s ', os.listdir('/tmp')) - logger.debug(os.system('df -h /tmp | tail -1')) - - if size_mb: - display_size = '{}MB'.format(size_mb) - else: - display_size = '{}KB'.format(size_kb) - logger.info('Starting download from S3 - %s/%s [%s]', - bucket, key, display_size) - - suffix = key.replace('/', '-') - _, downloaded_s3_object = tempfile.mkstemp(suffix=suffix) - with open(downloaded_s3_object, 'wb') as data: - start_time = time.time() - client.download_fileobj(bucket, key, data) - - end_time = time.time() - start_time - logger.info('Completed download in %s seconds', round(end_time, 2)) - - return downloaded_s3_object - - @classmethod - def _read_s3_file(cls, downloaded_s3_object): - """Parse a downloaded file from S3 - - Supports reading both gzipped files and plaintext files. Truncates - files after reading to save space on /tmp mount. - - Args: - downloaded_s3_object (string): A full path to the downloaded file. - - Returns: - (list) Lines from the downloaded s3 object - """ - lines = [] - filename, extension = os.path.splitext(downloaded_s3_object) - - if extension == '.gz': - with gzip.open(downloaded_s3_object, 'r') as f: - lines = f.readlines() - # truncate file - clear_file = gzip.open(downloaded_s3_object, 'w') - clear_file.close() - - else: - with open(downloaded_s3_object, 'r') as f: - lines = f.readlines() - # truncate file - clear_file = open(downloaded_s3_object, 'w') - clear_file.close() - - # remove file path - os.remove(downloaded_s3_object) - if not os.path.exists(downloaded_s3_object): - logger.info('Removed temp file - %s', downloaded_s3_object) - - return lines - - @classmethod - def parse_s3_object(cls, raw_record): - """Given an S3 record, download and parse the data. - - Args: - raw_record (dict): A S3 event record. - - Returns: - (list) Lines from the downloaded s3 object - """ - client = boto3.client('s3', region_name=raw_record['awsRegion']) - unquote = lambda data: urllib.unquote(data).decode('utf8') - bucket = unquote(raw_record['s3']['bucket']['name']) - key = unquote(raw_record['s3']['object']['key']) - size = int(raw_record['s3']['object']['size']) - downloaded_s3_object = cls._download_s3_object(client, bucket, key, size) - - return cls._read_s3_file(downloaded_s3_object) diff --git a/stream_alert/pre_parsers.py b/stream_alert/pre_parsers.py new file mode 100644 index 000000000..8e4575f29 --- /dev/null +++ b/stream_alert/pre_parsers.py @@ -0,0 +1,149 @@ +''' +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import base64 +import gzip +import logging +import os +import tempfile +import time +import urllib + +import boto3 + +logging.basicConfig() +logger = logging.getLogger('StreamAlert') + +class S3ObjectSizeError(Exception): + pass + +class StreamPreParsers(object): + """A collection of pre-parsers to get data for classificaiton + + The pre-parser's job is to read raw records from whichever + source invoked AWS Lambda (Kinesis, S3, etc), and perform all + necessary actions to get either a string or a collection of strings + """ + @classmethod + def pre_parse_kinesis(cls, raw_record): + """Decode a Kinesis record. + + Args: + raw_record (dict): A Kinesis event record. + + Returns: (string) Base64 decoded data. + """ + return base64.b64decode(raw_record['kinesis']['data']) + + @classmethod + def pre_parse_s3(cls, raw_record): + """Given an S3 record, download and parse the data. + + Args: + raw_record (dict): A S3 event record. + + Returns: + (list) Lines from the downloaded s3 object + """ + client = boto3.client('s3', region_name=raw_record['awsRegion']) + unquote = lambda data: urllib.unquote(data).decode('utf8') + bucket = unquote(raw_record['s3']['bucket']['name']) + key = unquote(raw_record['s3']['object']['key']) + size = int(raw_record['s3']['object']['size']) + downloaded_s3_object = cls._download_s3_object(client, bucket, key, size) + + return cls._read_s3_file(downloaded_s3_object) + + @classmethod + def _download_s3_object(cls, client, bucket, key, size): + """Download an object from S3. + + Verifies the S3 object is less than or equal to 128MB, and + stores into a temp file. Lambda can only execute for a + maximum of 300 seconds, and the file to download + greatly impacts that time. + + Args: + client: boto3 s3 client object + bucket (string): s3 bucket to download object from + key (string): key of s3 object + size (int): size of s3 object in bytes + + Returns: + (string) The downloaded path of the S3 object. + """ + size_kb = size / 1024 + size_mb = size_kb / 1024 + if size_mb > 128: + raise S3ObjectSizeError('S3 object to download is above 500MB') + + logger.debug('/tmp directory contents:%s ', os.listdir('/tmp')) + logger.debug(os.system('df -h /tmp | tail -1')) + + if size_mb: + display_size = '{}MB'.format(size_mb) + else: + display_size = '{}KB'.format(size_kb) + logger.info('Starting download from S3 - %s/%s [%s]', + bucket, key, display_size) + + suffix = key.replace('/', '-') + _, downloaded_s3_object = tempfile.mkstemp(suffix=suffix) + with open(downloaded_s3_object, 'wb') as data: + start_time = time.time() + client.download_fileobj(bucket, key, data) + + end_time = time.time() - start_time + logger.info('Completed download in %s seconds', round(end_time, 2)) + + return downloaded_s3_object + + @classmethod + def _read_s3_file(cls, downloaded_s3_object): + """Parse a downloaded file from S3 + + Supports reading both gzipped files and plaintext files. Truncates + files after reading to save space on /tmp mount. + + Args: + downloaded_s3_object (string): A full path to the downloaded file. + + Returns: + (list) Lines from the downloaded s3 object + """ + lines = [] + filename, extension = os.path.splitext(downloaded_s3_object) + + if extension == '.gz': + with gzip.open(downloaded_s3_object, 'r') as f: + lines = f.readlines() + # truncate file + clear_file = gzip.open(downloaded_s3_object, 'w') + clear_file.close() + + else: + with open(downloaded_s3_object, 'r') as f: + lines = f.readlines() + # truncate file + clear_file = open(downloaded_s3_object, 'w') + clear_file.close() + + # remove file path + os.remove(downloaded_s3_object) + if not os.path.exists(downloaded_s3_object): + logger.info('Removed temp file - %s', downloaded_s3_object) + + return lines diff --git a/test/unit/test_classifier.py b/test/unit/test_classifier.py index d626dc3a2..0982fa60a 100644 --- a/test/unit/test_classifier.py +++ b/test/unit/test_classifier.py @@ -24,7 +24,8 @@ from nose.tools import assert_equal, assert_not_equal, nottest -from stream_alert.classifier import StreamPayload, StreamPayloadHelpers +from stream_alert.classifier import StreamPayload +from stream_alert.pre_parsers import StreamPreParsers from stream_alert.config import load_config class TestStreamPayload(object): @@ -40,7 +41,7 @@ def teardown_class(cls): @staticmethod def pre_parse_kinesis(payload): - return StreamPayloadHelpers.pre_parse_kinesis(payload.raw_record) + return StreamPreParsers.pre_parse_kinesis(payload.raw_record) @staticmethod def make_kinesis_record(**kwargs): diff --git a/test/unit/test_rules_engine.py b/test/unit/test_rules_engine.py index d98fb63a6..1eeaf71fa 100644 --- a/test/unit/test_rules_engine.py +++ b/test/unit/test_rules_engine.py @@ -19,7 +19,8 @@ from nose.tools import assert_equal, assert_not_equal, nottest -from stream_alert.classifier import StreamPayload, StreamPayloadHelpers +from stream_alert.classifier import StreamPayload +from stream_alert.pre_parsers import StreamPreParsers from stream_alert.config import load_config from stream_alert.rules_engine import StreamRules @@ -53,7 +54,7 @@ def teardown(self): @staticmethod def pre_parse_kinesis(payload): - return StreamPayloadHelpers.pre_parse_kinesis(payload.raw_record) + return StreamPreParsers.pre_parse_kinesis(payload.raw_record) def make_kinesis_payload(self, **kwargs): kinesis_stream = kwargs.get('kinesis_stream') From b7b2dd0f178c9be92e4094e159d02cd3286f5804 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Wed, 8 Feb 2017 18:15:10 -0800 Subject: [PATCH 2/9] [core] use matchers as a guard for rules. Previously, all matchers and rules were evaluated in parallel. We can save processing if we only evaluate rules if matchers return True. This change also cleans up the rules_engine.process flow. --- stream_alert/rules_engine.py | 44 +++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/stream_alert/rules_engine.py b/stream_alert/rules_engine.py index 5b891ec5f..93c7bf806 100644 --- a/stream_alert/rules_engine.py +++ b/stream_alert/rules_engine.py @@ -163,17 +163,17 @@ def process_subkeys(cls, payload, rule): def process(cls, input_payload): """Process rules on a record. - Get the list of rules based on the record's data source name. + Gather a list of rules based on the record's datasource type. For each rule, evaluate the record through all listed matchers - and the rule itself to determine if a match occurs. If so, - send the record to its listed `sink`. + and the rule itself to determine if a match occurs. Returns: - An array of triggered alerts. An alert is represented as - a dictionary with the following keys: - rule_name - the name of the triggered rule - payload - the StreamPayload object - outputs - list of outputs to send to + List of alerts. + + An alert is represented as a dictionary with the following keys: + rule_name: the name of the triggered rule + payload: the StreamPayload object + outputs: list of outputs to send to """ rules = [] alerts = [] @@ -185,16 +185,24 @@ def process(cls, input_payload): if len(rules) > 0: for rule in rules: + # subkey check has_sub_keys = cls.process_subkeys(payload, rule) - if has_sub_keys: - matcher_result = cls.match_event(payload, rule) - rule_result = cls.process_rule(payload, rule) - if matcher_result and rule_result: - alert = { - 'rule_name': rule.rule_name, - 'payload': payload, - 'outputs': rule.outputs - } - alerts.append(alert) + if not has_sub_keys: + continue + + # matcher check + matcher_result = cls.match_event(payload, rule) + if not matcher_result: + continue + + # rule analysis + rule_result = cls.process_rule(payload, rule) + if rule_result: + alert = { + 'rule_name': rule.rule_name, + 'payload': payload, + 'outputs': rule.outputs + } + alerts.append(alert) return alerts From 9724b0caefc527d5c62fa039e83d6d8fe6513ad4 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Thu, 9 Feb 2017 14:24:03 -0800 Subject: [PATCH 3/9] [core] make parsers modular Overall, this refactor provides a common interface for all parsers. By namespacing parsers into their own classes, we get the benefit of parser specific helper methods, along with a consistent structure for all parsers. Change summary: * Move all parser logic out of StreamClassifier into a new parsers.py file * Create an Abstract Base Class for Parsers to inherit from * Register new parser classes with a parser decorator * Expose a get_parser method to other modules --- stream_alert/classifier.py | 217 ++++------------------------- stream_alert/parsers.py | 278 +++++++++++++++++++++++++++++++++++++ 2 files changed, 308 insertions(+), 187 deletions(-) create mode 100644 stream_alert/parsers.py diff --git a/stream_alert/classifier.py b/stream_alert/classifier.py index 6807d269f..5a4e432e8 100644 --- a/stream_alert/classifier.py +++ b/stream_alert/classifier.py @@ -31,12 +31,17 @@ import boto3 +from stream_alert.parsers import get_parser + logging.basicConfig() logger = logging.getLogger('StreamAlert') class InvalidSchemaError(Exception): pass +# class StreamClassifier(object): +# + class StreamPayload(object): """Classify and parse a raw record into its declared type. @@ -81,9 +86,9 @@ def __init__(self, **kwargs): self.type = None self.log_source = None self.record = None + self.raw_record = kwargs.get('raw_record') self.log_metadata = kwargs.get('log_metadata', None) - self.raw_record = kwargs.get('raw_record') self.env = kwargs.get('env') self.config = kwargs.get('config') @@ -210,32 +215,35 @@ def _parse(self, data): """ for log_name, attributes in self.log_metadata.iteritems(): if not self.type: - parser = attributes.get('parser') + parser_name = attributes['parser'] else: - parser = self.type - parser_method = getattr(self, 'parse_{}'.format(parser)) + parser_name = self.type - args = {} - args['schema'] = attributes.get('schema') - args['hints'] = attributes.get('hints') - args['parser'] = parser - args['delimiter'] = attributes.get('delimiter') - args['separator'] = attributes.get('separator') + options = {} + options['hints'] = attributes.get('hints') + options['delimiter'] = attributes.get('delimiter') + options['separator'] = attributes.get('separator') + options['parser'] = parser_name + options['service'] = self.service + schema = attributes['schema'] + + parser_class = get_parser(parser_name) + parser = parser_class(data, schema, options) + parsed_data = parser.parse() - parsed_data = parser_method(data, args) logger.debug('log_name: %s', log_name) logger.debug('parsed_data: %s', parsed_data) if parsed_data: - parsed_and_typed_data = self._convert_type(parsed_data, args) + parsed_and_typed_data = self._convert_type(parsed_data, schema, options) if parsed_and_typed_data: self.log_source = log_name - self.type = parser - self.record = parsed_data + self.type = parser_name + self.record = parsed_and_typed_data return True return False - def _convert_type(self, payload, args): + def _convert_type(self, parsed_data, schema, options): """Convert a parsed payload's values into their declared types. If the schema is incorrectly defined for a particular field, @@ -244,12 +252,12 @@ def _convert_type(self, payload, args): Args: payload (dict): parsed payload object - args (dict): log type schema denoting keys with their value types + options (dict): log type schema denoting keys with their value types Returns: (dict) parsed payload with typed values """ - schema = args['schema'] + payload = parsed_data for key, value in schema.iteritems(): key = str(key) # if the schema value is declared as string @@ -268,179 +276,14 @@ def _convert_type(self, payload, args): if len(value) == 0: pass else: - args['schema'] = schema[key] + schema = schema[key] # handle nested csv if isinstance(payload[key], str): - args['hints'] = args['hints'][key] - payload[key] = self.parse_csv(payload[key], args) - self._convert_type(payload[key], args) + options['hints'] = options['hints'][key] + parse_csv = get_parser('csv') + payload[key] = parse_csv(payload[key], schema, options).parse() + self._convert_type(payload[key], schema, options) else: logger.error('Invalid declared type - %s', value) return payload - - def parse_json(self, data, args): - """Parse a string into JSON. - - Args: - data (str): A decoded data string from a Lambda event. - args (dict): All parser arguments, JSON uses: - schema: Log type structure, including keys and their type. - - Returns: - A dictionary representing the data passed in. - False if the data is not JSON or the keys do not match. - """ - schema = args['schema'] - try: - json_payload = json.loads(data) - self.type = 'json' - except ValueError: - return False - - # top level key check - if set(json_payload.keys()) == set(schema.keys()): - # subkey check - for key, key_type in schema.iteritems(): - # if the key is a map of key/value pairs - if isinstance(key_type, dict) and key_type != {}: - if set(json_payload[key].keys()) != set(schema[key].keys()): - return False - return json_payload - else: - # logger.debug('JSON Key mismatch: %s vs. %s', json_payload.keys(), args['keys']) - return False - - def parse_csv(self, data, args): - """Parse a string into a comma separated value reader object. - - Args: - data (str): A decoded data string from a Lambda event. - args (dict): All parser arguments, CSV uses: - schema: Log type structure, including keys and their type. - hints: A list of string wildcards to find in data. - - Returns: - A dict of the parsed CSV record - """ - schema = args['schema'] - hints = args['hints'] - hint_result = [] - csv_payload = {} - - if self.service == 's3': - try: - csv_data = StringIO.StringIO(data) - reader = csv.DictReader(csv_data, delimiter=',') - except ValueError: - return False - - elif self.service == 'kinesis': - try: - csv_data = StringIO.StringIO(data) - reader = csv.reader(csv_data, delimiter=',') - except ValueError: - return False - - if reader and hints: - for row in reader: - # check number of columns match and any hints match - logger.debug('hint result: %s', hint_result) - if len(row) == len(schema): - for field, hint_list in hints.iteritems(): - # handle nested hints - if not isinstance(hint_list, list): - continue - # the hint field index in the row - field_index = schema.keys().index(field) - # store results per hint - hint_group_result = [] - for hint in hint_list: - hint_group_result.append(fnmatch(row[field_index], hint)) - # append the result of any of the hints being True - hint_result.append(any(hint_group_result)) - # if all hint group results are True - if all(hint_result): - self.type = 'csv' - for index, key in enumerate(schema): - csv_payload[key] = row[index] - return csv_payload - else: - # logger.debug('CSV Key mismatch: %s vs. %s', len(row), len(schema)) - return False - else: - return False - - def parse_kv(self, data, args): - """Parse a key value string into a dictionary. - - Args: - data (str): A decoded data string from a Lambda event. - args (dict): All parser arguments, KV uses: - schema: Log type structure, including keys and their type. - delimiter: The character between key/value pairs. - separator: The character between keys and values. - - Returns: - (dict) of the loaded key value pairs - """ - delimiter = args['delimiter'] - separator = args['separator'] - schema = args['schema'] - kv_payload = {} - - # remove any blank strings that may exist in our list - fields = filter(None, data.split(delimiter)) - # first check the field length matches our # of keys - if len(fields) == len(schema): - regex = re.compile('.+{}.+'.format(separator)) - for index, field in enumerate(fields): - # verify our fields match the kv regex - if regex.match(field): - key, value = field.split(separator) - # handle duplicate keys - if key in kv_payload: - # load key from our configuration - kv_payload[schema.keys()[index]] = value - else: - # load key from data - kv_payload[key] = value - else: - logger.error('key/value regex failure for %s', field) - self.type = 'kv' - return kv_payload - else: - return False - - def parse_syslog(self, data, args): - """Parse a syslog string into a dictionary - - Matches syslog events with the following format: - timestamp(Month DD HH:MM:SS) host application: message - Example(s): - Jan 10 19:35:33 vagrant-ubuntu-trusty-64 sudo: session opened for root - Jan 10 19:35:13 vagrant-ubuntu-precise-32 ssh[13941]: login for mike - - Args: - data (str): A decoded data string from a Lambda event. - args (dict): All parser arguments, Syslog uses: - schema: Log type structure, including keys and their type. - - Returns: - (dict) syslog key-value pairs - """ - schema = args['schema'] - syslog_payload = {} - syslog_regex = re.compile(r"(?P^\w{3}\s\d{2}\s(\d{2}:?)+)\s" - r"(?P(\w[-]*)+)\s" - r"(?P\w+)(\[\w+\])*:\s" - r"(?P.*$)") - - match = syslog_regex.search(data) - if match: - for key in schema.keys(): - syslog_payload[key] = match.group(key) - self.type = 'syslog' - return syslog_payload - else: - return False diff --git a/stream_alert/parsers.py b/stream_alert/parsers.py new file mode 100644 index 000000000..0dd252075 --- /dev/null +++ b/stream_alert/parsers.py @@ -0,0 +1,278 @@ +''' +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import csv +import json +import logging +import re +import StringIO + +from abc import ABCMeta, abstractmethod +from fnmatch import fnmatch + +logging.basicConfig() +logger = logging.getLogger('StreamAlert') + +def get_parser(parserid): + """Helper method to fetch parser classes + + Args: + parserid: the name of the parser class to get + + Returns: + - A Parser class + """ + return PARSERS[parserid] + +PARSERS = {} +def parser(cls): + """Class decorator to register parsers""" + PARSERS[cls.__parserid__] = cls + return cls + +class ParserBase: + """Abstract Parser class to be inherited by all StreamAlert Parsers""" + __metaclass__ = ABCMeta + + def __init__(self, data, schema, options): + """Setup required parser properties + + Args: + data: Data string to be parsed. + schema: Dict of log data schema. + options: Parser options dict - delimiter, separator, or hints + """ + self.data = data + self.schema = schema + self.options = options + # If we can parse into a correct type, but keys or other config + # options do not match up, we can set a type in the payload object + # to short circuit type determination. + self.payload_type = None + + @abstractmethod + def parse(self): + """Main parser method to be overridden by all Parser classes""" + pass + +@parser +class JSONParser(ParserBase): + __parserid__ = 'json' + + def parse(self): + """Parse a string into JSON. + + Options: + - None + + Returns: + - A dict of the parsed JSON record. + - False if the data is not JSON or the columns do not match. + """ + schema = self.schema + data = self.data + + try: + json_payload = json.loads(data) + self.payload_type = 'json' + except ValueError: + return False + + # top level key check + json_keys = set(json_payload.keys()) + schema_keys = set(schema.keys()) + if json_keys == schema_keys: + # subkey check + for key, key_type in schema.iteritems(): + # if the key is a map of key/value pairs + if isinstance(key_type, dict) and key_type != {}: + if set(json_payload[key].keys()) != set(schema[key].keys()): + return False + return json_payload + else: + logger.debug('JSON Key mismatch: %s vs. %s', json_keys, schema_keys) + return False + +@parser +class CSVParser(ParserBase): + __parserid__ = 'csv' + __default_delimiter = ',' + + def _get_reader(self): + """Return the CSV reader for the given payload source + + Returns: + - CSV reader object if the parse was successful + - False if parse was unsuccessful + """ + data = self.data + service = self.options['service'] + delimiter = self.options['delimiter'] or self.__default_delimiter + + if service == 's3': + try: + csv_data = StringIO.StringIO(data) + reader = csv.DictReader(csv_data, delimiter=delimiter) + except ValueError: + return False + + elif service == 'kinesis': + try: + csv_data = StringIO.StringIO(data) + reader = csv.reader(csv_data, delimiter=delimiter) + except ValueError: + return False + + return reader + + def parse(self): + """Parse a string into a comma separated value reader object. + + Options: + - hints: A dict of string wildcards to find in payload fields. + + Returns: + - A dict of the parsed CSV record. + - False if the data is not CSV or the columns do not match. + """ + schema = self.schema + hints = self.options['hints'] + + hint_result = [] + csv_payload = {} + + reader = self._get_reader() + if not reader: + return False + + for row in reader: + # check number of columns match and any hints match + if len(row) != len(schema): + logger.debug('CSV Key mismatch: %s vs. %s', len(row), len(schema)) + return False + + for field, hint_list in hints.iteritems(): + # handle nested hints + if not isinstance(hint_list, list): + continue + # the hint field index in the row + field_index = schema.keys().index(field) + # store results per hint + hint_group_result = [] + for hint in hint_list: + hint_group_result.append(fnmatch(row[field_index], hint)) + # append the result of any of the hints being True + hint_result.append(any(hint_group_result)) + + # if all hint group results are True + logger.debug('hint result: %s', hint_result) + if all(hint_result): + self.payload_type = 'csv' + for index, key in enumerate(schema): + csv_payload[key] = row[index] + + return csv_payload + +@parser +class KVParser(ParserBase): + __parserid__ = 'kv' + __default_separator = '=' + __default_delimiter = ' ' + + def parse(self): + """Parse a key value string into a dictionary. + + Options: + - delimiter: The character between key/value pairs. + - separator: The character between keys and values. + + Returns: + - A dict of the loaded key value pairs. + - False if the columns do not match. + """ + data = self.data + schema = self.schema + options = self.options + + delimiter = options['delimiter'] or self.__default_delimiter + separator = options['separator'] or self.__default_separator + + kv_payload = {} + + # remove any blank strings that may exist in our list + fields = filter(None, data.split(delimiter)) + # first check the field length matches our # of keys + if len(fields) != len(schema): + logger.debug('Parsed KV fields: %s', fields) + return False + + regex = re.compile('.+{}.+'.format(separator)) + for index, field in enumerate(fields): + # verify our fields match the kv regex + if regex.match(field): + key, value = field.split(separator) + # handle duplicate keys + if key in kv_payload: + # load key from our configuration + kv_payload[schema.keys()[index]] = value + else: + # load key from data + kv_payload[key] = value + else: + logger.error('key/value regex failure for %s', field) + + self.payload_type = 'kv' + + return kv_payload + +@parser +class SyslogParser(ParserBase): + __parserid__ = 'syslog' + + def parse(self): + """Parse a syslog string into a dictionary + + Matches syslog events with the following format: + timestamp(Month DD HH:MM:SS) host application: message + Example(s): + Jan 10 19:35:33 vagrant-ubuntu-trusty-64 sudo: session opened for root + Jan 10 19:35:13 vagrant-ubuntu-precise-32 ssh[13941]: login for mike + + Options: + - None + + Returns: + - A dict of syslog key-value pairs. + - False if the data does not match the syslog regex. + """ + schema = self.schema + data = self.data + + syslog_payload = {} + syslog_regex = re.compile(r"(?P^\w{3}\s\d{2}\s(\d{2}:?)+)\s" + r"(?P(\w[-]*)+)\s" + r"(?P\w+)(\[\w+\])*:\s" + r"(?P.*$)") + + match = syslog_regex.search(data) + if not match: + return False + + self.payload_type = 'syslog' + for key in schema.keys(): + syslog_payload[key] = match.group(key) + + return syslog_payload From e653331fc1e2bc61450bae209f36302d083e33b0 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Thu, 9 Feb 2017 18:16:38 -0800 Subject: [PATCH 4/9] [core] classifier refactor Please enter the commit message for your changes. Lines starting --- main.py | 17 +++- stream_alert/classifier.py | 184 ++++++++++++++++++++----------------- 2 files changed, 111 insertions(+), 90 deletions(-) diff --git a/main.py b/main.py index c45b569b8..334d92a64 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,7 @@ import os from stream_alert.config import load_config, load_env -from stream_alert.classifier import StreamPayload +from stream_alert.classifier import StreamPayload, StreamClassifier from stream_alert.pre_parsers import StreamPreParsers from stream_alert.rules_engine import StreamRules from stream_alert.sink import StreamSink @@ -52,23 +52,30 @@ def handler(event, context): config = load_config() env = load_env(context) + # process_alerts(event['Records']) alerts_to_send = [] + # TODO(jack): Move this into classification for record in event.get('Records'): - payload = StreamPayload(raw_record=record, config=config, env=env) - payload.map_source() + payload = StreamPayload(raw_record=record) + classifier = StreamClassifier(config=config) + classifier.map_source(payload) + # If the kinesis stream or s3 bucket is not in our config, + # go onto the next record. + if not payload.valid_source: + continue if payload.service == 's3': s3_file_lines = StreamPreParsers.pre_parse_s3(payload.raw_record) for line in s3_file_lines: data = line.rstrip() payload.refresh_record(data) - payload.classify_record(data) + classifier.classify_record(payload, data) process_alerts(payload, alerts_to_send) elif payload.service == 'kinesis': data = StreamPreParsers.pre_parse_kinesis(payload.raw_record) - payload.classify_record(data) + classifier.classify_record(payload, data) process_alerts(payload, alerts_to_send) if alerts_to_send: diff --git a/stream_alert/classifier.py b/stream_alert/classifier.py index 5a4e432e8..5b6880a63 100644 --- a/stream_alert/classifier.py +++ b/stream_alert/classifier.py @@ -14,22 +14,9 @@ limitations under the License. ''' -import base64 -import csv -import gzip -import json import logging -import os -import re -import StringIO -import tempfile -import time -import urllib from collections import OrderedDict -from fnmatch import fnmatch - -import boto3 from stream_alert.parsers import get_parser @@ -37,17 +24,22 @@ logger = logging.getLogger('StreamAlert') class InvalidSchemaError(Exception): + """Raise this exception if a declared schema field type does not match + the data passed.""" pass -# class StreamClassifier(object): -# class StreamPayload(object): - """Classify and parse a raw record into its declared type. + """Container class for the StreamAlert payload object. Attributes: + raw_record: The record from the AWS Lambda Records dictionary. + valid: A boolean representing if the record is deemed valid by - parsing and classification logic. + parsing and classification. + + valid_source: A boolean of if the record source is declared in + the sources.json configuration file. service: The aws service where the record originated from. Can be either S3 or kinesis. @@ -58,43 +50,43 @@ class StreamPayload(object): log_source: The name of the logging application which the data originated from. This could be osquery, auditd, etc. - type: The data type of the record - json, csv, syslog, or kv. - - record: A typed record. + type: The data type of the record - json, csv, syslog, etc. - s3_file: A full path to a downloaded file from S3. + record: A parsed and typed record. Public Methods: - classify_record - parse_json - parse_csv - parse_syslog + refresh_record """ def __init__(self, **kwargs): """ Keyword Args: - env (dict): Loaded environment data about the currently running lambda - raw_record (dict): The record to be parsed - log_metadata (dict): Log sources and their attributes for the entity - config (dict): Loaded JSON configuration files. Contains two keys: - logs, and sources + raw_record (dict): The record to be parsed - in AWS event format """ - self.valid = False + self.raw_record = kwargs['raw_record'] + self.service = None self.entity = None self.type = None self.log_source = None self.record = None - self.raw_record = kwargs.get('raw_record') - self.log_metadata = kwargs.get('log_metadata', None) - self.env = kwargs.get('env') - self.config = kwargs.get('config') + self.valid = False + self.valid_source = None def __repr__(self): - return ''.format( - self.valid, self.log_source, self.entity, self.type, self.record) + repr_str = ('' + ).format(self.valid, + self.log_source, + self.entity, + self.type, + self.record) + + return repr_str def refresh_record(self, new_record): """Replace the currently loaded record with a new one. @@ -112,23 +104,32 @@ def refresh_record(self, new_record): self.type = None self.raw_record = new_record - def map_source(self): + +class StreamClassifier(object): + """Classify, map source, and parse a raw record into its declared type.""" + def __init__(self, **kwargs): + self.config = kwargs['config'] + + def map_source(self, payload): """Map a record to its originating AWS service and entity. Each raw record contains a set of keys to represent its source. A Kinesis record will contain a `kinesis` key while a S3 record contains `s3`. + Args: + payload: A StreamAlert payload object + Sets: - self.service: The AWS service which sent the record - self.entity: The specific instance of a service which sent the record - self.log_metadata: All logs for a declared entity, with their attrs. + payload.service: The AWS service which sent the record + payload.entity: The specific instance of a service which sent the record + payload.valid_source: Validates the record source """ # check raw record for either kinesis or s3 keys - if 'kinesis' in self.raw_record: - self.service = 'kinesis' - elif 's3' in self.raw_record: - self.service = 's3' + if 'kinesis' in payload.raw_record: + payload.service = 'kinesis' + elif 's3' in payload.raw_record: + payload.service = 's3' # map the entity name from a record entity_mapper = { @@ -136,23 +137,26 @@ def map_source(self): 's3': lambda r: r['s3']['bucket']['name'] } # get the entity name - self.entity = entity_mapper[self.service](self.raw_record) + payload.entity = entity_mapper[payload.service](payload.raw_record) - # get all entities for the configured service (s3 or kinesis) - all_service_entities = self.config['sources'][self.service] - config_entity = all_service_entities.get(self.entity) + # if the payload's entity is found in the config and contains logs + if self._payload_logs(payload): + payload.valid_source = True + def _payload_logs(self, payload): + # get all logs for the configured service/enetity (s3 or kinesis) + all_service_entities = self.config['sources'][payload.service] + config_entity = all_service_entities.get(payload.entity) if config_entity: - entity_log_sources = config_entity['logs'] - self.log_metadata = self._log_metadata(entity_log_sources, self.config.get('logs')) + return config_entity['logs'] + else: + return False - @staticmethod - def _log_metadata(entity_log_sources, all_config_logs): + def log_metadata(self, payload): """Return a mapping of all log sources to a given entity with attributes. Args: - entity_log_sources (list): All log sources declared for a source entity. - all_config_logs (dict): JSON loaded conf/logs.conf file. + payload: A StreamAlert payload object to be mapped Returns: (dict) log sources and their attributes for the entity: @@ -169,14 +173,18 @@ def _log_metadata(entity_log_sources, all_config_logs): } """ metadata = {} + + all_config_logs = self.config['logs'] + entity_log_sources = self._payload_logs(payload) for log_source, log_source_attributes in all_config_logs.iteritems(): source_pieces = log_source.split(':') category = source_pieces[0] if category in entity_log_sources: metadata[log_source] = log_source_attributes + return metadata - def classify_record(self, data): + def classify_record(self, payload, data): """Classify and type raw record passed into StreamAlert. Before we apply our rules to a record passed to the lambda function, @@ -185,61 +193,66 @@ def classify_record(self, data): the record's data source and parsing its data type. Args: - data (str): a raw record to classify + payload: A StreamAlert payload object + data: Pre parsed data string from a raw_event to be parsed """ - if self.log_metadata: - parse_result = self._parse(data) - if all([ - parse_result, - self.service, - self.entity, - self.type, - self.log_source, - self.record - ]): - self.valid = True - - def _parse(self, data): + parse_result = self._parse(payload, data) + if all([parse_result, + payload.service, + payload.entity, + payload.type, + payload.log_source, + payload.record]): + payload.valid = True + + def _parse(self, payload, data): """Parse a record into a declared type. Args: - data (str): A decoded data string from the event record. + payload: A StreamAlert payload object + data: Pre parsed data string from a raw_event to be parsed Sets: - self.log_source: The detected log name from the data_sources config. - self.type: The record's type. - self.record: The parsed record. + payload.log_source: The detected log name from the data_sources config. + payload.type: The record's type. + payload.record: The parsed record. Returns: A boolean representing the success of the parse. """ - for log_name, attributes in self.log_metadata.iteritems(): - if not self.type: + logger.debug(data) + + for log_name, attributes in self.log_metadata(payload).iteritems(): + if not payload.type: parser_name = attributes['parser'] else: - parser_name = self.type + parser_name = payload.type options = {} options['hints'] = attributes.get('hints') options['delimiter'] = attributes.get('delimiter') options['separator'] = attributes.get('separator') options['parser'] = parser_name - options['service'] = self.service + options['service'] = payload.service schema = attributes['schema'] parser_class = get_parser(parser_name) parser = parser_class(data, schema, options) parsed_data = parser.parse() - logger.debug('log_name: %s', log_name) + # Used for short circuiting parser determination + if parser.payload_type: + payload.type = parser.payload_type + + logger.debug('log name: %s', log_name) logger.debug('parsed_data: %s', parsed_data) if parsed_data: parsed_and_typed_data = self._convert_type(parsed_data, schema, options) if parsed_and_typed_data: - self.log_source = log_name - self.type = parser_name - self.record = parsed_and_typed_data + payload.log_source = log_name + payload.type = parser_name + payload.record = parsed_and_typed_data return True return False @@ -251,11 +264,12 @@ def _convert_type(self, parsed_data, schema, options): invalid. Args: - payload (dict): parsed payload object - options (dict): log type schema denoting keys with their value types + parsed_data: parsed dict payload + schema: data schema for a specific log source + options: parser options dict Returns: - (dict) parsed payload with typed values + parsed dict payload with typed values """ payload = parsed_data for key, value in schema.iteritems(): From 6c74b3b636a61f4504bc7451e6bb464e430a9b72 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Mon, 13 Feb 2017 11:59:03 -0800 Subject: [PATCH 5/9] [testing] update unit tests per refactor --- test/unit/conf/sources.json | 6 +- test/unit/test_classifier.py | 234 ++++++++++++++++----------------- test/unit/test_rules_engine.py | 15 ++- 3 files changed, 127 insertions(+), 128 deletions(-) diff --git a/test/unit/conf/sources.json b/test/unit/conf/sources.json index 16c2304b0..59b178bdf 100644 --- a/test/unit/conf/sources.json +++ b/test/unit/conf/sources.json @@ -8,14 +8,14 @@ "test_log_type_json_nested_with_data", "test_log_type_csv", "test_log_type_csv_nested", - "test_log_type_kv_auditd", - "test_log_type_syslog" + "test_log_type_kv_auditd" ] }, "test_stream_2": { "logs": [ "test_log_type_json_2", - "test_log_type_json_nested_osquery" + "test_log_type_json_nested_osquery", + "test_log_type_syslog" ] } } diff --git a/test/unit/test_classifier.py b/test/unit/test_classifier.py index 0982fa60a..c45c98148 100644 --- a/test/unit/test_classifier.py +++ b/test/unit/test_classifier.py @@ -24,7 +24,7 @@ from nose.tools import assert_equal, assert_not_equal, nottest -from stream_alert.classifier import StreamPayload +from stream_alert.classifier import StreamPayload, StreamClassifier from stream_alert.pre_parsers import StreamPreParsers from stream_alert.config import load_config @@ -57,6 +57,16 @@ def make_kinesis_record(**kwargs): } return raw_record + def payload_generator(self, **kwargs): + """Given raw data, return a payload object""" + kinesis_stream = kwargs['kinesis_stream'] + kinesis_data = kwargs['kinesis_data'] + kinesis_record = self.make_kinesis_record(kinesis_stream=kinesis_stream, + kinesis_data=kinesis_data) + + payload = StreamPayload(raw_record=kinesis_record) + return payload + def setup(self): """Setup before each method""" self.env = { @@ -72,44 +82,37 @@ def teardown(self): """Teardown after each method""" pass + def test_refresh_record(self): - kinesis_data = { + kinesis_data = json.dumps({ 'key3': 'key3data', 'key2': 'key2data', 'key1': 'key1data' - } - kinesis_data_json = json.dumps(kinesis_data) - first_raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=kinesis_data_json) - payload = StreamPayload(raw_record=first_raw_record, - env=self.env, - config=self.config) + }) + payload = self.payload_generator(kinesis_stream='test_kinesis_stream', + kinesis_data=kinesis_data) # generate a second record - new_kinesis_data = { + new_kinesis_data = json.dumps({ 'key4': 'key4data', 'key5': 'key5data', 'key6': 'key6data' - } - new_kinesis_data_json = json.dumps(new_kinesis_data) + }) second_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=new_kinesis_data_json) + kinesis_data=new_kinesis_data) payload.refresh_record(second_record) - # check loaded record + # check newly loaded record assert_equal(payload.raw_record, second_record) - assert_not_equal(payload.raw_record, first_raw_record) - def test_map_source(self): - data = 'test_map_source data' - data_encoded = base64.b64encode(data) - raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=data_encoded) - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + def test_map_source_1(self): + data_encoded = base64.b64encode('test_map_source data') + payload = self.payload_generator(kinesis_stream='test_kinesis_stream', + kinesis_data=data_encoded) + + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) test_kinesis_stream_logs = { 'test_log_type_json', @@ -118,51 +121,51 @@ def test_map_source(self): 'test_log_type_json_nested_with_data', 'test_log_type_csv', 'test_log_type_csv_nested', - 'test_log_type_kv_auditd', - 'test_log_type_syslog' + 'test_log_type_kv_auditd' } - + metadata = classifier.log_metadata(payload) + # service, entity, metadata test assert_equal(payload.service, 'kinesis') assert_equal(payload.entity, 'test_kinesis_stream') - assert_equal(set(payload.log_metadata.keys()), test_kinesis_stream_logs) + assert_equal(set(metadata.keys()), test_kinesis_stream_logs) - # second stream test - raw_record_2 = self.make_kinesis_record(kinesis_stream='test_stream_2', - kinesis_data=data_encoded) - payload_2 = StreamPayload(raw_record=raw_record_2, - env=self.env, - config=self.config) - payload_2.map_source() + def test_map_source_2(self): + data_encoded = base64.b64encode('test_map_source_data_2') + payload = self.payload_generator(kinesis_stream='test_stream_2', + kinesis_data=data_encoded) + + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) test_stream_2_logs = { 'test_log_type_json_2', - 'test_log_type_json_nested_osquery' + 'test_log_type_json_nested_osquery', + 'test_log_type_syslog' } + metadata = classifier.log_metadata(payload) # service, entity, metadata test - assert_equal(payload_2.service, 'kinesis') - assert_equal(payload_2.entity, 'test_stream_2') - assert_equal(set(payload_2.log_metadata.keys()), test_stream_2_logs) + assert_equal(payload.service, 'kinesis') + assert_equal(payload.entity, 'test_stream_2') + assert_equal(set(metadata.keys()), test_stream_2_logs) + def test_classify_record_kinesis_json(self): - kinesis_data = { + kinesis_data = json.dumps({ 'key1': 'sample data!!!!', 'key2': 'more sample data', 'key3': '1' - } - kinesis_data_json = json.dumps(kinesis_data) - raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=kinesis_data_json) - - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + }) + payload = self.payload_generator(kinesis_stream='test_kinesis_stream', + kinesis_data=kinesis_data) + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) + # pre parse and classify data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # valid record test assert_equal(payload.valid, True) @@ -180,8 +183,9 @@ def test_classify_record_kinesis_json(self): assert_equal(type(payload.record['key2']), str) assert_equal(type(payload.record['key3']), int) + def test_classify_record_kinesis_nested_json(self): - kinesis_data = { + kinesis_data = json.dumps({ 'date': 'Jan 01 2017', 'unixtime': '1485556524', 'host': 'my-host-name', @@ -189,18 +193,14 @@ def test_classify_record_kinesis_nested_json(self): 'key1': 'test', 'key2': 'one' } - } - kinesis_data_json = json.dumps(kinesis_data) - raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=kinesis_data_json) - - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + }) + payload = self.payload_generator(kinesis_stream='test_kinesis_stream', + kinesis_data=kinesis_data) + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # valid record test assert_equal(payload.valid, True) @@ -222,8 +222,9 @@ def test_classify_record_kinesis_nested_json(self): assert_equal(payload.record['date'], 'Jan 01 2017') assert_equal(payload.record['data']['key1'], 'test') + def test_classify_record_kinesis_nested_json_osquery(self): - kinesis_data = { + kinesis_data = json.dumps({ 'name': 'testquery', 'hostIdentifier': 'host1.test.prod', 'calendarTime': 'Jan 01 2017', @@ -239,18 +240,15 @@ def test_classify_record_kinesis_nested_json_osquery(self): 'cluster': 'eu-east', 'number': '100' } - } - kinesis_data_json = json.dumps(kinesis_data) - raw_record = self.make_kinesis_record(kinesis_stream='test_stream_2', - kinesis_data=kinesis_data_json) + }) + payload = self.payload_generator(kinesis_stream='test_stream_2', + kinesis_data=kinesis_data) - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # valid record test assert_equal(payload.valid, True) @@ -275,8 +273,9 @@ def test_classify_record_kinesis_nested_json_osquery(self): assert_equal(payload.record['decorations']['cluster'], 'eu-east') assert_equal(payload.record['decorations']['number'], 100) + def test_classify_record_kinesis_nested_json_missing_subkey_fields(self): - kinesis_data = { + kinesis_data = json.dumps({ 'name': 'testquery', 'hostIdentifier': 'host1.test.prod', 'calendarTime': 'Jan 01 2017', @@ -292,25 +291,23 @@ def test_classify_record_kinesis_nested_json_missing_subkey_fields(self): # 'cluster': 'eu-east', 'number': '100' } - } - kinesis_data_json = json.dumps(kinesis_data) - raw_record = self.make_kinesis_record(kinesis_stream='test_stream_2', - kinesis_data=kinesis_data_json) + }) + payload = self.payload_generator(kinesis_stream='test_stream_2', + kinesis_data=kinesis_data) - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # invalid record test assert_equal(payload.valid, False) assert_equal(payload.record, None) + def test_classify_record_kinesis_nested_json_with_data(self): - kinesis_data = { + kinesis_data = json.dumps({ 'date': 'Jan 01 2017', 'unixtime': '1485556524', 'host': 'host1', @@ -321,18 +318,15 @@ def test_classify_record_kinesis_nested_json_with_data(self): 'type': '1', 'source': 'dev-app-1' } - } - kinesis_data_json = json.dumps(kinesis_data) - raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=kinesis_data_json) + }) + payload = self.payload_generator(kinesis_stream='test_kinesis_stream', + kinesis_data=kinesis_data) - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # valid record test assert_equal(payload.valid, True) @@ -356,16 +350,17 @@ def test_classify_record_kinesis_nested_json_with_data(self): assert_equal(payload.record['date'], 'Jan 01 2017') assert_equal(payload.record['data']['source'], 'dev-app-1') + def test_classify_record_kinesis_csv(self): csv_data = 'jan102017,0100,host1,thisis some data with keyword1 in it' - raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', + payload = self.payload_generator(kinesis_stream='test_kinesis_stream', kinesis_data=csv_data) - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) + data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # valid record test assert_equal(payload.valid, True) @@ -382,19 +377,20 @@ def test_classify_record_kinesis_csv(self): # log source test assert_equal(payload.log_source, 'test_log_type_csv') + def test_classify_record_kinesis_csv_nested(self): csv_nested_data = ( '"Jan 10 2017","1485635414","host1.prod.test","Corp",' '"chef,web-server,1,10,success"' ) - raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=csv_nested_data) - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + payload = self.payload_generator(kinesis_stream='test_kinesis_stream', + kinesis_data=csv_nested_data) + + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) + data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # valid record test assert_equal(payload.valid, True) @@ -414,6 +410,7 @@ def test_classify_record_kinesis_csv_nested(self): # log source test assert_equal(payload.log_source, 'test_log_type_csv_nested') + def test_classify_record_kinesis_kv(self): auditd_test_data = ( 'type=SYSCALL msg=audit(1364481363.243:24287): ' @@ -429,14 +426,14 @@ def test_classify_record_kinesis_kv(self): 'rdev=00:00 obj=system_u:object_r:etc_t:s0' ) - raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=auditd_test_data) - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + payload = self.payload_generator(kinesis_stream='test_kinesis_stream', + kinesis_data=auditd_test_data) + + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) + data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # valid record test assert_equal(payload.valid, True) @@ -453,6 +450,7 @@ def test_classify_record_kinesis_kv(self): assert_not_equal(payload.type, 'csv') assert_not_equal(payload.type, 'json') + def test_classify_record_syslog(self): test_data_1 = ( 'Jan 26 19:35:33 vagrant-ubuntu-trusty-64 ' @@ -468,14 +466,14 @@ def test_classify_record_syslog(self): fixtures = {'test_1': test_data_1, 'test_2': test_data_2} for name, syslog_message in fixtures.iteritems(): - raw_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=syslog_message) - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + payload = self.payload_generator(kinesis_stream='test_stream_2', + kinesis_data=syslog_message) + + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) + data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) # valid record test assert_equal(payload.valid, True) diff --git a/test/unit/test_rules_engine.py b/test/unit/test_rules_engine.py index 1eeaf71fa..baa637b7f 100644 --- a/test/unit/test_rules_engine.py +++ b/test/unit/test_rules_engine.py @@ -19,7 +19,7 @@ from nose.tools import assert_equal, assert_not_equal, nottest -from stream_alert.classifier import StreamPayload +from stream_alert.classifier import StreamPayload, StreamClassifier from stream_alert.pre_parsers import StreamPreParsers from stream_alert.config import load_config from stream_alert.rules_engine import StreamRules @@ -67,12 +67,13 @@ def make_kinesis_payload(self, **kwargs): 'data': base64.b64encode(kinesis_data) } } - payload = StreamPayload(raw_record=raw_record, - env=self.env, - config=self.config) - payload.map_source() + payload = StreamPayload(raw_record=raw_record) + classifier = StreamClassifier(config=self.config) + + classifier.map_source(payload) data = self.pre_parse_kinesis(payload) - payload.classify_record(data) + classifier.classify_record(payload, data) + if payload.valid: return payload @@ -200,7 +201,7 @@ def syslog_sudo(rec): 'session opened for user root by (uid=0)' ) # prepare the payloads - payload = self.make_kinesis_payload(kinesis_stream='test_kinesis_stream', + payload = self.make_kinesis_payload(kinesis_stream='test_stream_2', kinesis_data=kinesis_data) # process payloads From ec3f9b17bf49d974b1ee4891d0215dc19bc76ee9 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Mon, 13 Feb 2017 11:59:45 -0800 Subject: [PATCH 6/9] [TODO] add comment to remove alert formatting --- stream_alert_output/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stream_alert_output/main.py b/stream_alert_output/main.py index d81b0aaff..920ddc070 100644 --- a/stream_alert_output/main.py +++ b/stream_alert_output/main.py @@ -114,6 +114,8 @@ def run(self, alerts): else: logger.error('Declared output [%s] does not exist', output) + # TODO(jacknagz) remove this - we don't need whitelist keys anymore now + # that the payload attributes are fixed def _format_alert(self, alert): """Alert formatter and shortener From 95f801908c4a51b846265a5e75c710ece03ed950 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Mon, 13 Feb 2017 14:20:56 -0800 Subject: [PATCH 7/9] [config] more error handling with unit test cases * add error handing for improper JSON files * fix a bug in the return statement spacing where it was skipping checks * add unit test cases for validating configs --- CONTRIBUTING.rst | 1 + stream_alert/config.py | 9 ++- test/unit/test_config.py | 125 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 3 deletions(-) create mode 100644 test/unit/test_config.py diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index af4a358e4..35372d200 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -42,6 +42,7 @@ When writing commit messages, make sure to prefix with one of the following tags [core] # changes with core stream_alert classes used across both functions [testing] # changes with testing infrastructure or processes [setup] # StreamAlert development setup changes + [config] # stream_alert config changes The first line of your commit message should be short. Use newlines to explain further:: diff --git a/stream_alert/config.py b/stream_alert/config.py index d7b11be4b..3bf4813a0 100644 --- a/stream_alert/config.py +++ b/stream_alert/config.py @@ -41,7 +41,10 @@ def load_config(conf_dir='conf/'): config = {} for desc, filename in conf_files.iteritems(): with open(os.path.join(conf_dir, filename)) as data: - config[desc] = json.load(data, object_pairs_hook=OrderedDict) + try: + config[desc] = json.load(data, object_pairs_hook=OrderedDict) + except ValueError: + raise ConfigError('Invalid JSON format for {}.json'.format(desc)) if validate_config(config): return config @@ -56,12 +59,12 @@ def validate_config(config): - each sources has a list of logs declared """ for config_key, settings in config.iteritems(): - # TODO(jacknagz): add specific unit test cases for this method # check log declarations if config_key == 'logs': for log, attrs in settings.iteritems(): if not {'schema', 'parser'}.issubset(set(attrs.keys())): raise ConfigError('Schema or parser missing for {}'.format(log)) + # check sources attributes elif config_key == 'sources': if not set(settings.keys()).issubset(set(['kinesis', 's3'])): @@ -73,7 +76,7 @@ def validate_config(config): if len(entity_attrs['logs']) == 0: raise ConfigError('Log list is empty for {}'.format(entity)) - return True + return True def load_env(context): """Get the current environment for the running Lambda function. diff --git a/test/unit/test_config.py b/test/unit/test_config.py new file mode 100644 index 000000000..4cd4cdf20 --- /dev/null +++ b/test/unit/test_config.py @@ -0,0 +1,125 @@ +''' +Copyright 2017-present, Airbnb Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +# command: nosetests -v -s test/unit/ +# specific test: nosetests -v -s test/unit/file.py:TestStreamPayload.test_name + +import base64 +import json + +from collections import OrderedDict + +from nose.tools import ( + assert_equal, + assert_not_equal, + nottest, + assert_raises, + raises +) + +from stream_alert.config import ( + ConfigError, + load_config, + validate_config +) + +def test_validate_config_valid(): + config = { + 'logs': { + 'json_log': { + 'schema': { + 'name': 'string' + }, + 'parser': 'json' + }, + 'csv_log': { + 'schema': { + 'data': 'string', + 'uid': 'integer' + }, + 'parser': 'csv' + } + }, + 'sources': { + 'kinesis': { + 'stream_1': { + 'logs': [ + 'json_log', + 'csv_log' + ] + } + } + } + } + + validate_result = validate_config(config) + assert_equal(validate_result, True) + +@raises(ConfigError) +def test_validate_config_no_parsers(): + config = { + 'logs': { + 'json_log': { + 'schema': { + 'name': 'string' + } + }, + 'csv_log': { + 'schema': { + 'data': 'string', + 'uid': 'integer' + } + } + }, + 'sources': { + 'kinesis': { + 'stream_1': { + 'logs': [ + 'json_log', + 'csv_log' + ] + } + } + } + } + + validate_result = validate_config(config) + +@raises(ConfigError) +def test_validate_config_no_logs(): + config = { + 'logs': { + 'json_log': { + 'schema': { + 'name': 'string' + } + }, + 'csv_log': { + 'schema': { + 'data': 'string', + 'uid': 'integer' + } + } + }, + 'sources': { + 'kinesis': { + 'stream_1': {} + } + } + } + + validate_result = validate_config(config) + \ No newline at end of file From 38ba93190781264ca91996f3be9848f2a0b326e9 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Tue, 14 Feb 2017 10:39:52 -0800 Subject: [PATCH 8/9] [testing] sample fixtures/rules/conf, test runner scripts, and more * Add functional test fixtures, rules, and conf for users to work off of * Fix a bug in the CLI test to strip all records of newline characters * Abstract the testing commands into shell scripts to be used by devs and Travis CI --- .gitignore | 7 +- .travis.yml | 3 +- conf/{sample_logs.json => logs.json} | 22 +++--- conf/sample_sources.json | 29 -------- conf/sources.json | 24 +++++++ rules/sample_rules.py | 35 ++++++++++ stream_alert_cli/test.py | 67 ++++++++++--------- .../kinesis_stream_1/non_trigger_events.json | 0 .../kinesis_stream_1/trigger_events.json | 0 .../non_trigger_events.json | 1 + .../trigger_events.json | 4 ++ .../fixtures/out/kinesis_record_events.json | 39 +++++++++++ test/scripts/integration_test_kinesis.sh | 2 + test/scripts/unit_tests.sh | 2 + 14 files changed, 161 insertions(+), 74 deletions(-) rename conf/{sample_logs.json => logs.json} (65%) delete mode 100644 conf/sample_sources.json create mode 100644 conf/sources.json delete mode 100644 test/integration/fixtures/kinesis/kinesis_stream_1/non_trigger_events.json delete mode 100644 test/integration/fixtures/kinesis/kinesis_stream_1/trigger_events.json create mode 100644 test/integration/fixtures/kinesis/prefix_cluster1_stream_alert_kinesis/non_trigger_events.json create mode 100644 test/integration/fixtures/kinesis/prefix_cluster1_stream_alert_kinesis/trigger_events.json create mode 100644 test/integration/fixtures/out/kinesis_record_events.json create mode 100755 test/scripts/integration_test_kinesis.sh create mode 100755 test/scripts/unit_tests.sh diff --git a/.gitignore b/.gitignore index 5c92c26dd..18b0afe72 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # Sphinx build directory -build +docs/build -# PYC files +# Compiled Python files *.pyc # Terraform files @@ -14,3 +14,6 @@ Thumbs.db .DS_Store *.swp terminal.glue + +# nose coverage file +.coverage diff --git a/.travis.yml b/.travis.yml index 183de301c..e241fb2e3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,4 +6,5 @@ before_install: install: - pip install -r requirements.txt script: - - nosetests -v -s test/unit/ \ No newline at end of file + - ./test/scripts/unit_tests.sh + - ./test/scripts/integration_test_kinesis.sh diff --git a/conf/sample_logs.json b/conf/logs.json similarity index 65% rename from conf/sample_logs.json rename to conf/logs.json index 3858753f7..c6055dce1 100644 --- a/conf/sample_logs.json +++ b/conf/logs.json @@ -1,19 +1,15 @@ -/* -This is a sample! Copy and rename this file to `logs.json` in the same folder. -Below you will find a sample log for each parser type. -*/ { - "json_log_name": { + "json_log": { "schema": { "name": "string", - "host": "integer", + "host": "string", "data": { "time": "string" } }, "parser": "json" }, - "syslog_log_name": { + "syslog_log": { "schema": { "timestamp": "string", "host": "string", @@ -22,18 +18,22 @@ Below you will find a sample log for each parser type. }, "parser": "syslog" }, - "csv_log_name": { + "csv_log": { "schema": { "date": "string", "time": "integer", "host": "string", - "message": "string" + "message": "string", + "source": "string" }, + "parser": "csv", "hints": { - "message": ["*keyword*"] + "source": [ + "cluster *" + ] } }, - "kv_log_name": { + "kv_log": { "schema": { "type": "string", "msg": "string", diff --git a/conf/sample_sources.json b/conf/sample_sources.json deleted file mode 100644 index 916a6683c..000000000 --- a/conf/sample_sources.json +++ /dev/null @@ -1,29 +0,0 @@ -/* -This is a sample! Copy and rename this file to `sources.json` in the same folder. -Below you will find a sample sources mapping for each log. - -To get the name of the kinesis stream, type: -$ ./stream_alert_cli.py terraform status -*/ -{ - "kinesis": { - "prefix_cluster1_stream_alert_kinesis": { - "logs": [ - "json_log_name", - "csv_log_name" - ] - }, - "prefix_cluster2_stream_alert_kinesis": { - "logs": [ - "json_log_name" - ] - }, - }, - "s3": { - "my-s3-bucket-id": { - "logs": [ - "syslog_log_name" - ] - } - } -} \ No newline at end of file diff --git a/conf/sources.json b/conf/sources.json new file mode 100644 index 000000000..0abb9a7f6 --- /dev/null +++ b/conf/sources.json @@ -0,0 +1,24 @@ +{ + "kinesis": { + "prefix_cluster1_stream_alert_kinesis": { + "logs": [ + "json_log", + "syslog_log", + "kv_log", + "csv_log" + ] + }, + "prefix_cluster2_stream_alert_kinesis": { + "logs": [ + "json_log" + ] + } + }, + "s3": { + "my-s3-bucket-id": { + "logs": [ + "syslog_log" + ] + } + } +} \ No newline at end of file diff --git a/rules/sample_rules.py b/rules/sample_rules.py index ba03af210..c696d1250 100644 --- a/rules/sample_rules.py +++ b/rules/sample_rules.py @@ -43,3 +43,38 @@ def invalid_subnet_rule(rec): def rule_func(rec): """Description""" return True + + +@rule('sample_json_rule', + logs=['json_log'], + matchers=[], + outputs=['s3']) +def sample_json_rule(rec): + return rec['host'] == 'test-host-1' + + +@rule('sample_syslog_rule', + logs=['syslog_log'], + matchers=[], + outputs=['pagerduty']) +def sample_syslog_rule(rec): + return rec['application'] == 'sudo' + + +@rule('sample_csv_rule', + logs=['csv_log'], + matchers=[], + outputs=['s3']) +def sample_csv_rule(rec): + return rec['host'] == 'test-host-2' + + +@rule('sample_kv_rule', + logs=['kv_log'], + matchers=[], + outputs=['s3']) +def sample_kv_rule(rec): + return ( + rec['msg'] == 'fatal' and + rec['uid'] == 100 + ) diff --git a/stream_alert_cli/test.py b/stream_alert_cli/test.py index 4a9309d2f..e7713a6d7 100644 --- a/stream_alert_cli/test.py +++ b/stream_alert_cli/test.py @@ -64,6 +64,7 @@ def read_kinesis_records(local_directories): with open(os.path.join(root, json_file), 'r') as json_fh: lines = json_fh.readlines() for line in lines: + line = line.strip() record = { 'kinesis': {'data': base64.b64encode(line)}, 'eventSource': 'aws:{}'.format(folder), @@ -71,6 +72,7 @@ def read_kinesis_records(local_directories): .format(folder, root.split('/')[-1]) } records['Records'].append(record) + return records def read_s3_records(local_directories): @@ -121,10 +123,11 @@ def read_s3_records(local_directories): records = {'Records': []} for folder in local_directories: for root, _, files in os.walk(os.path.join(BASEFOLDER, folder)): - for json_file in files: - with open(os.path.join(root, json_file), 'r') as json_fh: - lines = json_fh.readlines() + for test_file in files: + with open(os.path.join(root, test_file), 'r') as test_file_fh: + lines = test_file_fh.readlines() for line in lines: + line = line.strip() # provide a way to skip records if line[0] == '#': continue @@ -135,6 +138,7 @@ def read_s3_records(local_directories): record['awsRegion'] = 'us-east-1' record['eventName'] = 'ObjectCreated:Put' records['Records'].append(record) + return records def format_sns(in_file): @@ -144,37 +148,38 @@ def format_sns(in_file): message = base64.b64encode(json.dumps(in_file_contents)) out_records = { "Records": [ - { - "EventVersion": "1.0", - "EventSubscriptionArn": "arn:aws:sns:EXAMPLE", - "EventSource": "aws:sns", - "Sns": { - "SignatureVersion": "1", - "Timestamp": "1970-01-01T00:00:00.000Z", - "Signature": "EXAMPLE", - "SigningCertUrl": "EXAMPLE", - "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", - "Message": message, - "MessageAttributes": { - "Test": { - "Type": "String", - "Value": "TestString" - }, - "TestBinary": { - "Type": "Binary", - "Value": "TestBinary" + { + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:EXAMPLE", + "EventSource": "aws:sns", + "Sns": { + "SignatureVersion": "1", + "Timestamp": "1970-01-01T00:00:00.000Z", + "Signature": "EXAMPLE", + "SigningCertUrl": "EXAMPLE", + "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", + "Message": message, + "MessageAttributes": { + "Test": { + "Type": "String", + "Value": "TestString" + }, + "TestBinary": { + "Type": "Binary", + "Value": "TestBinary" + } + }, + "Type": "Notification", + "UnsubscribeUrl": "EXAMPLE", + "TopicArn": "arn:aws:sns:EXAMPLE", + "Subject": "TestInvoke" } - }, - "Type": "Notification", - "UnsubscribeUrl": "EXAMPLE", - "TopicArn": "arn:aws:sns:EXAMPLE", - "Subject": "TestInvoke" - } + } + ] } - ] - } out_file = '{}.out'.format(in_file) write_records(out_records, out_file) + return out_file def write_records(records, out_file): @@ -190,7 +195,7 @@ def write_records(records, out_file): def stream_alert_test(options): def alert_emulambda(out_file): - context_file = os.path.join(BASEFOLDER, 'context') + # context_file = os.path.join(BASEFOLDER, 'context') sys.argv = ['emulambda', 'main.handler', out_file, '-v'] import emulambda emulambda.main() diff --git a/test/integration/fixtures/kinesis/kinesis_stream_1/non_trigger_events.json b/test/integration/fixtures/kinesis/kinesis_stream_1/non_trigger_events.json deleted file mode 100644 index e69de29bb..000000000 diff --git a/test/integration/fixtures/kinesis/kinesis_stream_1/trigger_events.json b/test/integration/fixtures/kinesis/kinesis_stream_1/trigger_events.json deleted file mode 100644 index e69de29bb..000000000 diff --git a/test/integration/fixtures/kinesis/prefix_cluster1_stream_alert_kinesis/non_trigger_events.json b/test/integration/fixtures/kinesis/prefix_cluster1_stream_alert_kinesis/non_trigger_events.json new file mode 100644 index 000000000..880bdbea8 --- /dev/null +++ b/test/integration/fixtures/kinesis/prefix_cluster1_stream_alert_kinesis/non_trigger_events.json @@ -0,0 +1 @@ +{"name": "name-1", "host": "test-host-2", "data": {"time": "Jan 01, 2017"}} \ No newline at end of file diff --git a/test/integration/fixtures/kinesis/prefix_cluster1_stream_alert_kinesis/trigger_events.json b/test/integration/fixtures/kinesis/prefix_cluster1_stream_alert_kinesis/trigger_events.json new file mode 100644 index 000000000..12cdbe731 --- /dev/null +++ b/test/integration/fixtures/kinesis/prefix_cluster1_stream_alert_kinesis/trigger_events.json @@ -0,0 +1,4 @@ +{"name": "name-1", "host": "test-host-1", "data": {"time": "Jan 01, 2017"}} +Jan 01 12:00:12 test-host-1 sudo[151]: COMMAND sudo rm /tmp/test +Jan 01 2017,1487095529,test-host-2,this is test data for rules,cluster 5 +type=comm msg=fatal uid=100 \ No newline at end of file diff --git a/test/integration/fixtures/out/kinesis_record_events.json b/test/integration/fixtures/out/kinesis_record_events.json new file mode 100644 index 000000000..55b984f43 --- /dev/null +++ b/test/integration/fixtures/out/kinesis_record_events.json @@ -0,0 +1,39 @@ +{ + "Records": [ + { + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:region:account-id:stream/prefix_cluster1_stream_alert_kinesis", + "kinesis": { + "data": "eyJuYW1lIjogIm5hbWUtMSIsICJob3N0IjogInRlc3QtaG9zdC0yIiwgImRhdGEiOiB7InRpbWUiOiAiSmFuIDAxLCAyMDE3In19" + } + }, + { + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:region:account-id:stream/prefix_cluster1_stream_alert_kinesis", + "kinesis": { + "data": "eyJuYW1lIjogIm5hbWUtMSIsICJob3N0IjogInRlc3QtaG9zdC0xIiwgImRhdGEiOiB7InRpbWUiOiAiSmFuIDAxLCAyMDE3In19" + } + }, + { + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:region:account-id:stream/prefix_cluster1_stream_alert_kinesis", + "kinesis": { + "data": "SmFuIDAxIDEyOjAwOjEyIHRlc3QtaG9zdC0xIHN1ZG9bMTUxXTogQ09NTUFORCBzdWRvIHJtIC90bXAvdGVzdA==" + } + }, + { + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:region:account-id:stream/prefix_cluster1_stream_alert_kinesis", + "kinesis": { + "data": "SmFuIDAxIDIwMTcsMTQ4NzA5NTUyOSx0ZXN0LWhvc3QtMix0aGlzIGlzIHRlc3QgZGF0YSBmb3IgcnVsZXMsY2x1c3RlciA1" + } + }, + { + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:region:account-id:stream/prefix_cluster1_stream_alert_kinesis", + "kinesis": { + "data": "dHlwZT1jb21tIG1zZz1mYXRhbCB1aWQ9MTAw" + } + } + ] +} \ No newline at end of file diff --git a/test/scripts/integration_test_kinesis.sh b/test/scripts/integration_test_kinesis.sh new file mode 100755 index 000000000..b47cbc87d --- /dev/null +++ b/test/scripts/integration_test_kinesis.sh @@ -0,0 +1,2 @@ +#! /bin/bash +./stream_alert_cli.py lambda test --source kinesis --func alert \ No newline at end of file diff --git a/test/scripts/unit_tests.sh b/test/scripts/unit_tests.sh new file mode 100755 index 000000000..cd168ffba --- /dev/null +++ b/test/scripts/unit_tests.sh @@ -0,0 +1,2 @@ +#! /bin/bash +nosetests test/unit --with-coverage --cover-package=stream_alert \ No newline at end of file From f1daa05aeb2a1006a78848570d9cb77d70fd4710 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Tue, 14 Feb 2017 11:29:27 -0800 Subject: [PATCH 9/9] [cli] only check terraform when necessary * We don't need the Terraform binary to run Lambda commands (it uses the AWS CLI) * Resolves #31 --- stream_alert_cli/cli.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/stream_alert_cli/cli.py b/stream_alert_cli/cli.py index 7fa08880e..569641c9e 100644 --- a/stream_alert_cli/cli.py +++ b/stream_alert_cli/cli.py @@ -55,18 +55,10 @@ def run(self, options): 'https://github.com/airbnb/streamalert/issues') logging.info(cli_load_message) - prereqs_message = ('Terraform not found! Please install and add to' - 'your $PATH:\n' - '$ export PATH=$PATH:/usr/local/terraform/bin') - terraform_check = self.run_command(['terraform', 'version'], - error_message=prereqs_message, - quiet=True) - # There is no `else` since it's handled in the self.run_command call - if terraform_check: - if options.command == 'lambda': - self._lambda_runner(options) - elif options.command == 'terraform': - self._terraform_runner(options) + if options.command == 'lambda': + self._lambda_runner(options) + elif options.command == 'terraform': + self._terraform_runner(options) def _lambda_runner(self, options): """Handle all Lambda CLI operations.""" @@ -80,8 +72,17 @@ def _lambda_runner(self, options): elif options.subcommand == 'test': stream_alert_test(options) + def _terraform_check(self): + prereqs_message = ('Terraform not found! Please install and add to' + 'your $PATH:\n' + '$ export PATH=$PATH:/usr/local/terraform/bin') + self.run_command(['terraform', 'version'], + error_message=prereqs_message, + quiet=True) + def _terraform_runner(self, options): """Handle all Terraform CLI operations.""" + self._terraform_check() deploy_opts = namedtuple('deploy_opts', 'func, env') # plan and apply our terraform infrastructure