diff --git a/docs/source/conf-schemas.rst b/docs/source/conf-schemas.rst index fb7bf7782..2c9e63164 100644 --- a/docs/source/conf-schemas.rst +++ b/docs/source/conf-schemas.rst @@ -320,7 +320,7 @@ The resultant parsed records:: "example": 1, "host": "jumphost-1.domain.com", "time": "11:00 PM", - "envelope": { + "streamalert:envelope_keys": { "id": 1431948983198, "application": "my-app" } @@ -329,7 +329,7 @@ The resultant parsed records:: "example": 2, "host": "jumphost-2.domain.com", "time": "12:00 AM", - "envelope": { + "streamalert:envelope_keys": { "id": 1431948983198, "application": "my-app" } diff --git a/stream_alert/rule_processor/classifier.py b/stream_alert/rule_processor/classifier.py index 50120abb9..360f7dfe1 100644 --- a/stream_alert/rule_processor/classifier.py +++ b/stream_alert/rule_processor/classifier.py @@ -16,13 +16,17 @@ import logging -from collections import OrderedDict +from collections import namedtuple, OrderedDict from stream_alert.rule_processor.parsers import get_parser logging.basicConfig() LOGGER = logging.getLogger('StreamAlert') +# Set the below to True when we want to support matching on multiple schemas +# and then log_patterns will be used as a fall back for key/value matching +SUPPORT_MULTIPLE_SCHEMA_MATCHING = False + class InvalidSchemaError(Exception): """Raise this exception if a declared schema field type does not match the data passed.""" @@ -212,50 +216,47 @@ def _check_valid_parse(self, valid_parses): Returns: [tuple] The proper tuple to use for parsing from the list of tuples """ - if len(valid_parses) == 1: + # If there is only one parse or we do not have support for multiple schemas + # enabled, then just return the first parse that was valid + if len(valid_parses) == 1 or not SUPPORT_MULTIPLE_SCHEMA_MATCHING: return valid_parses[0] matched_parses = [] for i, valid_parse in enumerate(valid_parses): - parser = valid_parse[1] - for data in valid_parse[2]: - if parser.matched_log_pattern(data, parser.options.get('log_patterns', {})): - matched_parses.append(valid_parses[i]) - break - else: - LOGGER.debug('log pattern matching failed for schema: %s', parser.schema) + log_patterns = valid_parse.parser.options.get('log_patterns', {}) + if (all(valid_parse.parser.matched_log_pattern(data, log_patterns) + for data in valid_parse.parsed_data)): + matched_parses.append(valid_parses[i]) + else: + LOGGER.debug('log pattern matching failed for schema: %s', valid_parse.root_schema) if matched_parses: if len(matched_parses) > 1: LOGGER.error('log patterns matched for multiple schemas: %s', - ', '.join(name for name, _, _ in matched_parses)) - LOGGER.error('proceeding with schema for: %s', matched_parses[0][0]) + ', '.join(parse.log_name for parse in matched_parses)) + LOGGER.error('proceeding with schema for: %s', matched_parses[0].log_name) return matched_parses[0] - - LOGGER.error('log classification matched for multiple schemas: %s', - ', '.join(name for name, _, _ in valid_parses)) - LOGGER.error('proceeding with schema for: %s', valid_parses[0][0]) + ', '.join(parse.log_name for parse in valid_parses)) + LOGGER.error('proceeding with schema for: %s', valid_parses[0].log_name) return valid_parses[0] - def _parse(self, payload, data): - """Parse a record into a declared type. + def _process_log_schemas(self, payload, data): + """Get any log schemas that matched this log format Args: payload: A StreamAlert payload object data: Pre parsed data string from a raw_event to be parsed - Sets: - payload.log_source: The detected log name from the data_sources config. - payload.type: The record's type. - payload.records: The parsed record. - Returns: - A boolean representing the success of the parse. + [list] A list containing any schemas that matched this log format + Each list entry contains the namedtuple of 'ClassifiedLog' with + values of log_name, root_schema, parser, and parsed_data """ + classified_log = namedtuple('ClassifiedLog', 'log_name, root_schema, parser, parsed_data') log_metadata = self._log_metadata() valid_parses = [] @@ -269,42 +270,60 @@ def _parse(self, payload, data): # Setup the parser class parser_class = get_parser(parser_name) - parser = parser_class(schema, options) + parser = parser_class(options) # Get a list of parsed records - parsed_data = parser.parse(data) + parsed_data = parser.parse(schema, data) LOGGER.debug('schema: %s', schema) - if parsed_data: - valid_parses.append((log_name, parser, parsed_data)) + if not parsed_data: + continue + + if SUPPORT_MULTIPLE_SCHEMA_MATCHING: + valid_parses.append(classified_log(log_name, schema, parser, parsed_data)) + continue + + log_patterns = parser.options.get('log_patterns') + if all(parser.matched_log_pattern(rec, log_patterns) for rec in parsed_data): + return [classified_log(log_name, schema, parser, parsed_data)] + + return valid_parses + + def _parse(self, payload, data): + """Parse a record into a declared type. + + Args: + payload: A StreamAlert payload object + data: Pre parsed data string from a raw_event to be parsed + + Sets: + payload.log_source: The detected log name from the data_sources config. + payload.type: The record's type. + payload.records: The parsed record. + + Returns: + A boolean representing the success of the parse. + """ + valid_parses = self._process_log_schemas(payload, data) if not valid_parses: return False valid_parse = self._check_valid_parse(valid_parses) - log_name, parser, parsed_data = valid_parse[0], valid_parse[1], valid_parse[2] - LOGGER.debug('log_name: %s', log_name) - LOGGER.debug('parsed_data: %s', parsed_data) + LOGGER.debug('log_name: %s', valid_parse.log_name) + LOGGER.debug('parsed_data: %s', valid_parse.parsed_data) - typed_data = [] - for data in parsed_data: + for data in valid_parse.parsed_data: # Convert data types per the schema - # Use the parser.schema due to updates caused by - # configuration settings such as envelope and optional_keys - converted_data = self._convert_type(data, parser.type(), parser.schema, parser.options) - if not converted_data: - payload.valid = False - break + # Use the root schema for the parser due to updates caused by + # configuration settings such as envelope_keys and optional_keys + if not self._convert_type(data, valid_parse.parser.type(), valid_parse.root_schema, valid_parse.parser.options): + return False - typed_data.append(converted_data) - - if not typed_data: - return False - - payload.log_source = log_name - payload.type = parser.type() - payload.records = parsed_data + payload.log_source = valid_parse.log_name + payload.type = valid_parse.parser.type() + payload.records = valid_parse.parsed_data return True @@ -348,26 +367,19 @@ def _convert_type(self, payload, parser_type, schema, options): elif value == 'boolean': payload[key] = str(payload[key]).lower() == 'true' - elif isinstance(value, OrderedDict): - # allow for any value to exist in the map - if value: - # handle nested csv - # skip the 'stream_log_envelope' key that we've added during parsing - if key == 'stream_log_envelope' and isinstance(payload[key], dict): - continue + elif isinstance(value, dict): + if not value: + continue # allow empty maps (dict) - if 'log_patterns' in options: - options['log_patterns'] = options['log_patterns'][key] + # handle nested values + # skip the 'streamalert:envelope_keys' key that we've added during parsing + if key == 'streamalert:envelope_keys' and isinstance(payload[key], dict): + continue - sub_schema = schema[key] - parser = get_parser(parser_type)(sub_schema, options) - parsed_nested_key = parser.parse(payload[key]) + if 'log_patterns' in options: + options['log_patterns'] = options['log_patterns'][key] - # Call the first element since a list is returned - if parsed_nested_key: - payload[key] = parsed_nested_key[0] - - self._convert_type(payload[key], parser_type, sub_schema, options) + self._convert_type(payload[key], parser_type, schema[key], options) elif isinstance(value, list): pass @@ -375,4 +387,4 @@ def _convert_type(self, payload, parser_type, schema, options): else: LOGGER.error('Unsupported schema type: %s', value) - return payload + return True diff --git a/stream_alert/rule_processor/parsers.py b/stream_alert/rule_processor/parsers.py index 7104be809..94af2bc0c 100644 --- a/stream_alert/rule_processor/parsers.py +++ b/stream_alert/rule_processor/parsers.py @@ -52,18 +52,17 @@ class ParserBase: __metaclass__ = ABCMeta __parserid__ = '' - def __init__(self, schema, options): + def __init__(self, options): """Setup required parser properties Args: schema: Dict of log data schema. options: Parser options dict - delimiter, separator, or log_patterns """ - self.schema = schema self.options = options or {} @abstractmethod - def parse(self, data): + def parse(self, schema, data): """Main parser method to be overridden by all Parser classes Args: @@ -80,10 +79,10 @@ def type(self): def matched_log_pattern(self, record, log_patterns): """Return True if all log patterns of this record match""" - # Return False immediately if there are no log patterns + # Return True immediately if there are no log patterns # or if the data being tested is not a dict - if not (log_patterns and isinstance(record, dict)): - return False + if not log_patterns: + return True pattern_result = [] for field, pattern_list in log_patterns.iteritems(): @@ -116,44 +115,46 @@ def matched_log_pattern(self, record, log_patterns): class JSONParser(ParserBase): __parserid__ = 'json' - def _key_check(self, json_records): + def _key_check(self, schema, json_records): """Verify the declared schema matches the json payload + If keys do not match per the schema, records are removed from the + passed in json_records list + Args: json_records [list]: List of dictionaries representing JSON payloads - If keys do not match per the schema, records are removed from the - passed in json_records list + Returns: + [bool] True if any log in the list matches the schema, False if not """ - schema_keys = set(self.schema.keys()) - valid_records = [] + schema_keys = set(schema.keys()) schema_match = False - for json_record in json_records: - json_keys = set(json_record.keys()) + for index in reversed(range(len(json_records))): + json_keys = set(json_records[index].keys()) if json_keys == schema_keys: schema_match = True - for key, key_type in self.schema.iteritems(): - # If the value is a map of defined key/value pairs + for key, key_type in schema.iteritems(): + if key == 'streamalert:envelope_keys' and isinstance(json_records[index][key], dict): + continue + # Nested key check if key_type and isinstance(key_type, dict): - # subkey check - schema_match = (set(json_record[key].keys()) == set(self.schema[key].keys())) - + schema_match = self._key_check(schema[key], [json_records[index][key]]) else: LOGGER.debug('JSON Key mismatch: %s vs. %s', json_keys, schema_keys) - if schema_match: - valid_records.append(json_record) + if not schema_match: + del json_records[index] - return valid_records + return bool(json_records) - def _parse_records(self, json_payload): + def _parse_records(self, schema, json_payload): """Iterate over a json_payload. Identify and extract nested payloads. Nested payloads can be detected with log_patterns (`records` should be a JSONpath selector that yields the desired nested records). If desired, fields present on the root record can be merged into child - events using the `envelope` option. + events using the `envelope_keys` option. Args: json_payload [dict]: The parsed json data @@ -187,7 +188,7 @@ def default_optional_values(key): for key_name, value_type in optional_keys.iteritems(): # Update the schema to ensure the record is valid - self.schema.update({key_name: value_type}) + schema.update({key_name: value_type}) # If the optional key isn't in our parsed json payload if key_name not in json_payload: # Set default value @@ -200,7 +201,7 @@ def default_optional_values(key): envelope = {} envelope_schema = self.options.get('envelope_keys', {}) if len(envelope_schema): - self.schema.update({'stream_log_envelope': envelope_schema}) + schema.update({'streamalert:envelope_keys': envelope_schema}) envelope_keys = envelope_schema.keys() envelope_jsonpath = jsonpath_rw.parse("$." + ",".join(envelope_keys)) envelope_matches = [match.value for match in envelope_jsonpath.find(json_payload)] @@ -210,7 +211,7 @@ def default_optional_values(key): for match in records_jsonpath.find(json_payload): record = match.value if len(envelope): - record.update({'stream_log_envelope': envelope}) + record.update({'streamalert:envelope_keys': envelope}) json_records.append(record) @@ -219,7 +220,7 @@ def default_optional_values(key): return json_records - def parse(self, data): + def parse(self, schema, data): """Parse a string into a list of JSON payloads. Args: @@ -229,26 +230,25 @@ def parse(self, data): [list] A list of dictionaries representing parsed records. [boolean] False if the data is not JSON or the data does not follow the schema. """ - if type(data) in {unicode, str}: + if isinstance(data, (unicode, str)): try: data = json.loads(data) except ValueError as err: LOGGER.debug('JSON parse failed: %s', str(err)) return False - json_records = self._parse_records(data) - valid_records = self._key_check(json_records) - - if len(valid_records): - return valid_records + json_records = self._parse_records(schema, data) + # Make sure all keys match the schema, including nests maps + if not self._key_check(schema, json_records): + return False - return False + return json_records @parser class GzipJSONParser(JSONParser): __parserid__ = 'gzip-json' - def parse(self, data): + def parse(self, schema, data): """Parse a gzipped string into JSON. Args: @@ -260,7 +260,7 @@ def parse(self, data): """ try: data = zlib.decompress(data, 47) - return super(GzipJSONParser, self).parse(data) + return super(GzipJSONParser, self).parse(schema, data) except zlib.error: return False @@ -292,7 +292,7 @@ def _get_reader(self, data): return reader - def parse(self, data): + def parse(self, schema, data): """Parse a string into a comma separated value reader object. Args: @@ -310,12 +310,23 @@ def parse(self, data): try: for row in reader: # check number of columns match - if len(row) != len(self.schema): - LOGGER.debug('csv key mismatch: %s vs. %s', len(row), len(self.schema)) + if len(row) != len(schema): + LOGGER.debug('csv key mismatch: %s vs. %s', len(row), len(schema)) return False - # extract the keys from the row via the index - csv_payloads.append({key: row[index] for index, key in enumerate(self.schema)}) + parsed_payload = {} + for index, key in enumerate(schema): + # extract the keys from the row via the index + parsed_payload[key] = row[index] + + # if the value for this key in the schema is a dict, this must be a nested + # value, so we should try to parse it as one and replace the value + if isinstance(schema[key], dict): + parsed_data = self.parse(schema[key], row[index]) + if parsed_data: + parsed_payload[key] = parsed_data[0] + + csv_payloads.append(parsed_payload) return csv_payloads except csv.Error: @@ -328,7 +339,7 @@ class KVParser(ParserBase): __default_separator = '=' __default_delimiter = ' ' - def parse(self, data): + def parse(self, schema, data): """Parse a key value string into a dictionary. Args: @@ -348,8 +359,8 @@ def parse(self, data): # remove any blank strings that may exist in our list fields = filter(None, data.split(delimiter)) # first check the field length matches our # of keys - if len(fields) != len(self.schema): - LOGGER.debug('KV field length mismatch: %s vs %s', fields, self.schema) + if len(fields) != len(schema): + LOGGER.debug('KV field length mismatch: %s vs %s', fields, schema) return False regex = re.compile('.+{}.+'.format(separator)) @@ -360,7 +371,7 @@ def parse(self, data): # handle duplicate keys if key in kv_payload: # load key from our configuration - kv_payload[self.schema.keys()[index]] = value + kv_payload[schema.keys()[index]] = value else: # load key from data kv_payload[key] = value @@ -376,7 +387,7 @@ def parse(self, data): class SyslogParser(ParserBase): __parserid__ = 'syslog' - def parse(self, data): + def parse(self, schema, data): """Parse a syslog string into a dictionary Matches syslog events with the following format: @@ -401,4 +412,4 @@ def parse(self, data): if not match: return False - return [{key: match.group(key) for key in self.schema.keys()}] + return [{key: match.group(key) for key in schema.keys()}] diff --git a/test/unit/conf/logs.json b/test/unit/conf/logs.json index 9c470aa59..8acc813f8 100644 --- a/test/unit/conf/logs.json +++ b/test/unit/conf/logs.json @@ -241,5 +241,39 @@ "owner": "integer" } } + }, + "test_multiple_schemas:01": { + "schema": { + "name": "string", + "identifier": "string", + "time": "string", + "type": "string", + "message": "string" + }, + "parser": "json", + "configuration": { + "log_patterns": { + "type": [ + "*file_added_event*" + ] + } + } + }, + "test_multiple_schemas:02": { + "schema": { + "name": "string", + "identifier": "string", + "time": "string", + "type": "string", + "message": "string" + }, + "parser": "json", + "configuration": { + "log_patterns": { + "type": [ + "*file_removed_event*" + ] + } + } } } \ No newline at end of file diff --git a/test/unit/conf/sources.json b/test/unit/conf/sources.json index 6b71f095c..7767eb725 100644 --- a/test/unit/conf/sources.json +++ b/test/unit/conf/sources.json @@ -13,6 +13,7 @@ }, "test_stream_2": { "logs": [ + "test_multiple_schemas", "test_log_type_json_2", "test_log_type_json_nested_osquery", "test_log_type_syslog" diff --git a/test/unit/test_classifier.py b/test/unit/test_classifier.py index 325dfa83b..b6c1aa18e 100644 --- a/test/unit/test_classifier.py +++ b/test/unit/test_classifier.py @@ -22,6 +22,8 @@ from nose.tools import assert_equal, assert_not_equal +import stream_alert.rule_processor.classifier as sa_classifier + from stream_alert.rule_processor.classifier import StreamPayload, StreamClassifier from stream_alert.rule_processor.pre_parsers import StreamPreParsers from stream_alert.rule_processor.config import load_config @@ -139,6 +141,8 @@ def test_map_source_2(self): classifier.map_source(payload) test_stream_2_logs = { + 'test_multiple_schemas:01', + 'test_multiple_schemas:02', 'test_log_type_json_2', 'test_log_type_json_nested_osquery', 'test_log_type_syslog' @@ -150,6 +154,57 @@ def test_map_source_2(self): assert_equal(payload.entity, 'test_stream_2') assert_equal(set(metadata.keys()), test_stream_2_logs) + def test_multiple_schema_matching(self): + """Test Matching Multiple Schemas with Log Patterns""" + kinesis_data = json.dumps({ + 'name': 'file added test', + 'identifier': 'host4.this.test', + 'time': 'Jan 01 2017', + 'type': 'lol_file_added_event_test', + 'message': 'bad_001.txt was added' + }) + # Make sure support for multiple schema matching is ON + sa_classifier.SUPPORT_MULTIPLE_SCHEMA_MATCHING = True + + payload = self.payload_generator(kinesis_stream='test_stream_2', + kinesis_data=kinesis_data) + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) + + data = self.pre_parse_kinesis(payload) + valid_parses = classifier._process_log_schemas(payload, data) + + assert_equal(len(valid_parses), 2) + assert_equal(valid_parses[0].log_name, 'test_multiple_schemas:01') + assert_equal(valid_parses[1].log_name, 'test_multiple_schemas:02') + valid_parse = classifier._check_valid_parse(valid_parses) + + assert_equal(valid_parse.log_name, 'test_multiple_schemas:01') + + def test_single_schema_matching(self): + """Test Matching One Schema with Log Patterns""" + kinesis_data = json.dumps({ + 'name': 'file removal test', + 'identifier': 'host4.this.test.also', + 'time': 'Jan 01 2017', + 'type': 'file_removed_event_test', + 'message': 'bad_001.txt was removed' + }) + # Make sure support for multiple schema matching is OFF + sa_classifier.SUPPORT_MULTIPLE_SCHEMA_MATCHING = False + + payload = self.payload_generator(kinesis_stream='test_stream_2', + kinesis_data=kinesis_data) + classifier = StreamClassifier(config=self.config) + classifier.map_source(payload) + + data = self.pre_parse_kinesis(payload) + valid_parses = classifier._process_log_schemas(payload, data) + + assert_equal(len(valid_parses), 1) + valid_parse = classifier._check_valid_parse(valid_parses) + assert_equal(valid_parse.log_name, 'test_multiple_schemas:02') + def test_classify_record_kinesis_json_optional(self): """Payload Classify JSON - optional fields""" kinesis_data = json.dumps({ diff --git a/test/unit/test_gzip_json_parser.py b/test/unit/test_gzip_json_parser.py index 8ad52b4a9..0208a9230 100644 --- a/test/unit/test_gzip_json_parser.py +++ b/test/unit/test_gzip_json_parser.py @@ -37,8 +37,8 @@ def parser_helper(self, **kwargs): schema = kwargs['schema'] options = kwargs['options'] - json_parser = self.parser_class(schema, options) - parsed_result = json_parser.parse(data) + json_parser = self.parser_class(options) + parsed_result = json_parser.parse(schema, data) return parsed_result def test_cloudwatch(self): @@ -60,9 +60,9 @@ def test_cloudwatch(self): expected_keys = (u'protocol', u'source', u'destination', u'srcport', u'destport', u'eni', u'action', u'packets', u'bytes', u'windowstart', u'windowend', u'version', u'account', - u'flowlogstatus',u'stream_log_envelope') + u'flowlogstatus',u'streamalert:envelope_keys') expected_envelope_keys = (u'logGroup', u'logStream', u'owner') for result in parsed_result: assert_equal(sorted(expected_keys), sorted(result.keys())) - assert_equal(sorted(expected_envelope_keys),sorted(result['stream_log_envelope'].keys())) + assert_equal(sorted(expected_envelope_keys),sorted(result['streamalert:envelope_keys'].keys())) diff --git a/test/unit/test_json_parser.py b/test/unit/test_json_parser.py index d027ac321..a64a6aad0 100644 --- a/test/unit/test_json_parser.py +++ b/test/unit/test_json_parser.py @@ -35,8 +35,8 @@ def parser_helper(self, **kwargs): schema = kwargs['schema'] options = kwargs['options'] - json_parser = self.parser_class(schema, options) - parsed_result = json_parser.parse(data) + json_parser = self.parser_class(options) + parsed_result = json_parser.parse(schema, data) return parsed_result def test_multi_nested_json(self): diff --git a/test/unit/test_kv_parser.py b/test/unit/test_kv_parser.py index 2c45970e7..389cae6db 100644 --- a/test/unit/test_kv_parser.py +++ b/test/unit/test_kv_parser.py @@ -27,8 +27,8 @@ def parser_helper(self, **kwargs): schema = kwargs['schema'] options = kwargs['options'] - kv_parser = self.parser_class(schema, options) - parsed_result = kv_parser.parse(data) + kv_parser = self.parser_class(options) + parsed_result = kv_parser.parse(schema, data) return parsed_result def test_kv_parsing(self):