Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional Top Level Keys and Configuration Log Option #96

Merged
merged 6 commits into from
Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions conf/logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
"time": "integer"
},
"parser": "kv",
"delimiter": " ",
"separator": "="
"configuration": {
"delimiter": " ",
"separator": "="
}
},
"osquery": {
"schema": {
Expand Down Expand Up @@ -88,8 +90,8 @@
"recipientAccountId": "integer"
},
"parser": "json",
"hints": {
"records": "Records[*]"
"configuration": {
"json_path": "Records[*]"
}
},
"cloudtrail:v1.04": {
Expand Down Expand Up @@ -133,8 +135,8 @@
"recipientAccountId": "integer"
},
"parser": "json",
"hints": {
"records": "Records[*]"
"configuration": {
"json_path": "Records[*]"
}
},
"cloudwatch:cloudtrail": {
Expand Down Expand Up @@ -202,9 +204,9 @@
"flowlogstatus": "string"
},
"parser": "gzip-json",
"hints": {
"records": "logEvents[*].extractedFields",
"envelope": {
"configuration": {
"json_path": "logEvents[*].extractedFields",
"envelope_keys": {
"logGroup": "string",
"logStream": "string",
"owner": "integer"
Expand Down
24 changes: 12 additions & 12 deletions stream_alert/rule_processor/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,35 +222,33 @@ def _parse(self, payload, data):
Returns:
A boolean representing the success of the parse.
"""

log_metadata = self.log_metadata(payload)
# TODO(jack) make this process more efficient.
# Separate out parsing with key matching.
# Right now, if keys match but the type/parser is correct,
# it has to start over
for log_name, attributes in log_metadata.iteritems():
# short circuit parser determination
# Short circuit parser determination
if not payload.type:
parser_name = attributes['parser']
else:
parser_name = payload.type

options = {}
options['hints'] = attributes.get('hints')
options['delimiter'] = attributes.get('delimiter')
options['separator'] = attributes.get('separator')
options['hints'] = attributes.get('hints', {})
options['configuration'] = attributes.get('configuration', {})
options['parser'] = parser_name
options['service'] = payload.service

schema = attributes['schema']

# Setup the parser
# Setup the parser class
parser_class = get_parser(parser_name)
parser = parser_class(data, schema, options)
options['nested_keys'] = parser.__dict__.get('nested_keys')
# A list of parsed records
parsed_data = parser.parse()

# Used for short circuiting parser determination
# Set the payload type to short circuit parser determination
if parser.payload_type:
payload.type = parser.payload_type

Expand All @@ -259,8 +257,10 @@ def _parse(self, payload, data):
logger.debug('parsed_data: %s', parsed_data)
typed_data = []
for data in parsed_data:
# convert data types per the schema
typed_data.append(self._convert_type(data, schema, options))
# Convert data types per the schema
# Use the parser.schema due to updates caused by
# configuration settings such as envelope and optional_keys
typed_data.append(self._convert_type(data, parser.schema, options))

if typed_data:
payload.log_source = log_name
Expand Down Expand Up @@ -296,14 +296,14 @@ def _convert_type(self, parsed_data, schema, options):
elif value == 'integer':
try:
payload[key] = int(payload[key])
except ValueError as e:
except ValueError:
logger.error('Invalid schema - %s is not an int', key)
return False

elif value == 'float':
try:
payload[key] = float(payload[key])
except ValueError as e:
except ValueError:
logger.error('Invalid schema - %s is not a float', key)
return False

Expand Down
89 changes: 61 additions & 28 deletions stream_alert/rule_processor/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

import csv
import json
import jsonpath_rw
import zlib
import logging
import re
import StringIO
import zlib

from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from fnmatch import fnmatch

import jsonpath_rw

logging.basicConfig()
logger = logging.getLogger('StreamAlert')

Expand Down Expand Up @@ -133,26 +135,55 @@ def _parse_records(self, json_payload):
If desired, fields present on the root record can be merged into child
events using the `envelope` option.


Args:
json_payload: A dict of the parsed json data
schema: A dict of a log type's schema
json_payload [dict]: The parsed json data
schema [dict]: A log type's schema

Returns:
A list of dict JSON payloads
[list] of dictionaries representing JSON payloads
"""
json_records = []
envelope = {}

hints = self.options.get('hints', {})
if hints:
records_schema = hints.get('records')
envelope_schema = hints.get('envelope', {})

if (hints and len(hints) and records_schema):
# Check configuration options
config_options = self.options.get('configuration')
if config_options:
records_schema = config_options.get('json_path')
envelope_schema = config_options.get('envelope_keys', {})
optional_keys = config_options.get('optional_top_level_keys')

# Handle optional keys
if config_options and optional_keys:
# Note: This function exists because dict/OrderedDict cannot
# be keys in a dictionary.
def default_optional_values(key):
"""Return a default value for a given schema type"""
if key == 'string':
return str()
elif key == 'integer':
return int()
elif key == 'float':
return float()
elif key == 'boolean':
return bool()
elif key == []:
return list()
elif key == OrderedDict():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This smells funny. As you point out, lists and dicts cannot be keys in a Python dictionary. So why do you allow them as your own keys? Why isn't it just key == 'list' and key == 'dict'?

Keep in mind, this will only match if the key is an empty OrderedDict. Do you mean to use type?

Finally, if key == OrderedDict(), you return dict(), meaning you've removed the ordering. Why the discrepancy in types?

Copy link
Contributor Author

@jacknagz jacknagz Apr 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@austinbyers it's not key == 'list' or 'dict' because in the schema's they are dictated as [] or {}. Since we load as an OrderedDict, empty {} is converted to OrderedDict(). It's mean't to be empty.

return dict()

for key_name, value_type in optional_keys.iteritems():
# Update the schema to ensure the record is valid
self.schema.update({key_name: value_type})
# If the optional key isn't in our parsed json payload
if key_name not in json_payload:
# Set default value
json_payload[key_name] = default_optional_values(value_type)

# Handle jsonpath extraction of records
if config_options and records_schema:
records_jsonpath = jsonpath_rw.parse(records_schema)
if len(envelope_schema):
self.schema.update({"envelope": envelope_schema})
self.schema.update({'envelope': envelope_schema})
envelope_keys = envelope_schema.keys()
envelope_jsonpath = jsonpath_rw.parse("$." + ",".join(envelope_keys))
envelope_matches = [match.value for match in envelope_jsonpath.find(json_payload)]
Expand All @@ -161,7 +192,7 @@ def _parse_records(self, json_payload):
for match in records_jsonpath.find(json_payload):
record = match.value
if len(envelope):
record.update({"envelope": envelope})
record.update({'envelope': envelope})
json_records.append(record)
else:
json_records.append(json_payload)
Expand All @@ -183,8 +214,8 @@ def parse(self):
try:
json_payload = json.loads(data)
self.payload_type = 'json'
except ValueError as e:
logger.debug('JSON parse failed: %s', str(e))
except ValueError as err:
logger.debug('JSON parse failed: %s', str(err))
return False

json_records = self._parse_records(json_payload)
Expand All @@ -202,16 +233,14 @@ class GzipJSONParser(JSONParser):
def parse(self):
"""Parse a gzipped string into JSON.

Options:
- hints
Returns:
- An array of parsed JSON records.
- False if the data is not Gzipped JSON or the columns do not match.
"""
try:
json_payload = zlib.decompress(self.data,47)
json_payload = zlib.decompress(self.data, 47)
self.data = json_payload
return super(GzipJSONParser,self).parse()
return super(GzipJSONParser, self).parse()

except zlib.error:
return False
Expand All @@ -221,23 +250,25 @@ class CSVParser(ParserBase):
__parserid__ = 'csv'
__default_delimiter = ','

def _get_reader(self):
def _get_reader(self, config_options):
"""Return the CSV reader for the given payload source

Args:
config_options [map]: Map containing parser options such as delimiter

Returns:
- CSV reader object if the parse was successful
- False if parse was unsuccessful
"""
data = self.data
service = self.options['service']
delimiter = self.options['delimiter'] or self.__default_delimiter
delimiter = config_options.get('delimiter') or self.__default_delimiter

# TODO(ryandeivert): either subclass a current parser or add a new
# parser to support parsing CSV data that contains a header line
try:
csv_data = StringIO.StringIO(data)
reader = csv.reader(csv_data, delimiter=delimiter)
except ValueError, csv.Error:
except (ValueError, csv.Error):
return False

return reader
Expand All @@ -254,11 +285,12 @@ def parse(self):
"""
schema = self.schema
hints = self.options.get('hints')
config_options = self.options.get('configuration')

hint_result = []
csv_payloads = []

reader = self._get_reader()
reader = self._get_reader(config_options)
if not reader:
return False
try:
Expand Down Expand Up @@ -291,7 +323,7 @@ def parse(self):

csv_payloads.append(csv_payload)

return csv_payloads
return csv_payloads
except csv.Error:
return False

Expand All @@ -315,9 +347,10 @@ def parse(self):
data = self.data
schema = self.schema
options = self.options
config_options = options.get('configuration', {})

delimiter = options['delimiter'] or self.__default_delimiter
separator = options['separator'] or self.__default_separator
delimiter = config_options.get('delimiter') or self.__default_delimiter
separator = config_options.get('separator') or self.__default_separator

kv_payload = {}
try:
Expand Down
2 changes: 2 additions & 0 deletions test/scripts/autopep8.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#! /bin/bash
autopep8 --in-place --aggressive --aggressive $1
2 changes: 1 addition & 1 deletion test/scripts/unit_tests.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#! /bin/bash
nosetests test/unit --with-coverage --cover-package=stream_alert
nosetests test/unit --with-coverage --cover-package=stream_alert --cover-package=stream_alert_cli --cover-min-percentage=80
Loading