Skip to content

Commit

Permalink
Merge pull request #131 from airbnb/ryandeivert-fix-for-log-patterns
Browse files Browse the repository at this point in the history
[lambda][rule] fixing a bug related to log_pattern enforcement
  • Loading branch information
ryandeivert authored May 5, 2017
2 parents a4f5d0f + 300a722 commit 7cedf2a
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 121 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf-schemas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ The resultant parsed records::
"example": 1,
"host": "jumphost-1.domain.com",
"time": "11:00 PM",
"envelope": {
"streamalert:envelope_keys": {
"id": 1431948983198,
"application": "my-app"
}
Expand All @@ -329,7 +329,7 @@ The resultant parsed records::
"example": 2,
"host": "jumphost-2.domain.com",
"time": "12:00 AM",
"envelope": {
"streamalert:envelope_keys": {
"id": 1431948983198,
"application": "my-app"
}
Expand Down
140 changes: 76 additions & 64 deletions stream_alert/rule_processor/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

import logging

from collections import OrderedDict
from collections import namedtuple, OrderedDict

from stream_alert.rule_processor.parsers import get_parser

logging.basicConfig()
LOGGER = logging.getLogger('StreamAlert')

# Set the below to True when we want to support matching on multiple schemas
# and then log_patterns will be used as a fall back for key/value matching
SUPPORT_MULTIPLE_SCHEMA_MATCHING = False

class InvalidSchemaError(Exception):
"""Raise this exception if a declared schema field type does not match
the data passed."""
Expand Down Expand Up @@ -212,50 +216,47 @@ def _check_valid_parse(self, valid_parses):
Returns:
[tuple] The proper tuple to use for parsing from the list of tuples
"""
if len(valid_parses) == 1:
# If there is only one parse or we do not have support for multiple schemas
# enabled, then just return the first parse that was valid
if len(valid_parses) == 1 or not SUPPORT_MULTIPLE_SCHEMA_MATCHING:
return valid_parses[0]

matched_parses = []
for i, valid_parse in enumerate(valid_parses):
parser = valid_parse[1]
for data in valid_parse[2]:
if parser.matched_log_pattern(data, parser.options.get('log_patterns', {})):
matched_parses.append(valid_parses[i])
break
else:
LOGGER.debug('log pattern matching failed for schema: %s', parser.schema)
log_patterns = valid_parse.parser.options.get('log_patterns', {})
if (all(valid_parse.parser.matched_log_pattern(data, log_patterns)
for data in valid_parse.parsed_data)):
matched_parses.append(valid_parses[i])
else:
LOGGER.debug('log pattern matching failed for schema: %s', valid_parse.root_schema)

if matched_parses:
if len(matched_parses) > 1:
LOGGER.error('log patterns matched for multiple schemas: %s',
', '.join(name for name, _, _ in matched_parses))
LOGGER.error('proceeding with schema for: %s', matched_parses[0][0])
', '.join(parse.log_name for parse in matched_parses))
LOGGER.error('proceeding with schema for: %s', matched_parses[0].log_name)

return matched_parses[0]



LOGGER.error('log classification matched for multiple schemas: %s',
', '.join(name for name, _, _ in valid_parses))
LOGGER.error('proceeding with schema for: %s', valid_parses[0][0])
', '.join(parse.log_name for parse in valid_parses))
LOGGER.error('proceeding with schema for: %s', valid_parses[0].log_name)

return valid_parses[0]

def _parse(self, payload, data):
"""Parse a record into a declared type.
def _process_log_schemas(self, payload, data):
"""Get any log schemas that matched this log format
Args:
payload: A StreamAlert payload object
data: Pre parsed data string from a raw_event to be parsed
Sets:
payload.log_source: The detected log name from the data_sources config.
payload.type: The record's type.
payload.records: The parsed record.
Returns:
A boolean representing the success of the parse.
[list] A list containing any schemas that matched this log format
Each list entry contains the namedtuple of 'ClassifiedLog' with
values of log_name, root_schema, parser, and parsed_data
"""
classified_log = namedtuple('ClassifiedLog', 'log_name, root_schema, parser, parsed_data')
log_metadata = self._log_metadata()
valid_parses = []

Expand All @@ -269,42 +270,60 @@ def _parse(self, payload, data):

# Setup the parser class
parser_class = get_parser(parser_name)
parser = parser_class(schema, options)
parser = parser_class(options)

# Get a list of parsed records
parsed_data = parser.parse(data)
parsed_data = parser.parse(schema, data)

LOGGER.debug('schema: %s', schema)
if parsed_data:
valid_parses.append((log_name, parser, parsed_data))
if not parsed_data:
continue

if SUPPORT_MULTIPLE_SCHEMA_MATCHING:
valid_parses.append(classified_log(log_name, schema, parser, parsed_data))
continue

log_patterns = parser.options.get('log_patterns')
if all(parser.matched_log_pattern(rec, log_patterns) for rec in parsed_data):
return [classified_log(log_name, schema, parser, parsed_data)]

return valid_parses

def _parse(self, payload, data):
"""Parse a record into a declared type.
Args:
payload: A StreamAlert payload object
data: Pre parsed data string from a raw_event to be parsed
Sets:
payload.log_source: The detected log name from the data_sources config.
payload.type: The record's type.
payload.records: The parsed record.
Returns:
A boolean representing the success of the parse.
"""
valid_parses = self._process_log_schemas(payload, data)

if not valid_parses:
return False

valid_parse = self._check_valid_parse(valid_parses)

log_name, parser, parsed_data = valid_parse[0], valid_parse[1], valid_parse[2]
LOGGER.debug('log_name: %s', log_name)
LOGGER.debug('parsed_data: %s', parsed_data)
LOGGER.debug('log_name: %s', valid_parse.log_name)
LOGGER.debug('parsed_data: %s', valid_parse.parsed_data)

typed_data = []
for data in parsed_data:
for data in valid_parse.parsed_data:
# Convert data types per the schema
# Use the parser.schema due to updates caused by
# configuration settings such as envelope and optional_keys
converted_data = self._convert_type(data, parser.type(), parser.schema, parser.options)
if not converted_data:
payload.valid = False
break
# Use the root schema for the parser due to updates caused by
# configuration settings such as envelope_keys and optional_keys
if not self._convert_type(data, valid_parse.parser.type(), valid_parse.root_schema, valid_parse.parser.options):
return False

typed_data.append(converted_data)

if not typed_data:
return False

payload.log_source = log_name
payload.type = parser.type()
payload.records = parsed_data
payload.log_source = valid_parse.log_name
payload.type = valid_parse.parser.type()
payload.records = valid_parse.parsed_data

return True

Expand Down Expand Up @@ -348,31 +367,24 @@ def _convert_type(self, payload, parser_type, schema, options):
elif value == 'boolean':
payload[key] = str(payload[key]).lower() == 'true'

elif isinstance(value, OrderedDict):
# allow for any value to exist in the map
if value:
# handle nested csv
# skip the 'stream_log_envelope' key that we've added during parsing
if key == 'stream_log_envelope' and isinstance(payload[key], dict):
continue
elif isinstance(value, dict):
if not value:
continue # allow empty maps (dict)

if 'log_patterns' in options:
options['log_patterns'] = options['log_patterns'][key]
# handle nested values
# skip the 'streamalert:envelope_keys' key that we've added during parsing
if key == 'streamalert:envelope_keys' and isinstance(payload[key], dict):
continue

sub_schema = schema[key]
parser = get_parser(parser_type)(sub_schema, options)
parsed_nested_key = parser.parse(payload[key])
if 'log_patterns' in options:
options['log_patterns'] = options['log_patterns'][key]

# Call the first element since a list is returned
if parsed_nested_key:
payload[key] = parsed_nested_key[0]

self._convert_type(payload[key], parser_type, sub_schema, options)
self._convert_type(payload[key], parser_type, schema[key], options)

elif isinstance(value, list):
pass

else:
LOGGER.error('Unsupported schema type: %s', value)

return payload
return True
Loading

0 comments on commit 7cedf2a

Please sign in to comment.