diff --git a/conf/logs.json b/conf/logs.json index 8c6c42856..68ca15a6d 100644 --- a/conf/logs.json +++ b/conf/logs.json @@ -41,8 +41,10 @@ "time": "integer" }, "parser": "kv", - "delimiter": " ", - "separator": "=" + "configuration": { + "delimiter": " ", + "separator": "=" + } }, "osquery": { "schema": { @@ -88,8 +90,8 @@ "recipientAccountId": "integer" }, "parser": "json", - "hints": { - "records": "Records[*]" + "configuration": { + "json_path": "Records[*]" } }, "cloudtrail:v1.04": { @@ -133,8 +135,8 @@ "recipientAccountId": "integer" }, "parser": "json", - "hints": { - "records": "Records[*]" + "configuration": { + "json_path": "Records[*]" } }, "cloudwatch:cloudtrail": { @@ -202,9 +204,9 @@ "flowlogstatus": "string" }, "parser": "gzip-json", - "hints": { - "records": "logEvents[*].extractedFields", - "envelope": { + "configuration": { + "json_path": "logEvents[*].extractedFields", + "envelope_keys": { "logGroup": "string", "logStream": "string", "owner": "integer" diff --git a/stream_alert/rule_processor/classifier.py b/stream_alert/rule_processor/classifier.py index 5b580a908..510b13e38 100644 --- a/stream_alert/rule_processor/classifier.py +++ b/stream_alert/rule_processor/classifier.py @@ -222,35 +222,33 @@ def _parse(self, payload, data): Returns: A boolean representing the success of the parse. """ - log_metadata = self.log_metadata(payload) # TODO(jack) make this process more efficient. # Separate out parsing with key matching. # Right now, if keys match but the type/parser is correct, # it has to start over for log_name, attributes in log_metadata.iteritems(): - # short circuit parser determination + # Short circuit parser determination if not payload.type: parser_name = attributes['parser'] else: parser_name = payload.type options = {} - options['hints'] = attributes.get('hints') - options['delimiter'] = attributes.get('delimiter') - options['separator'] = attributes.get('separator') + options['hints'] = attributes.get('hints', {}) + options['configuration'] = attributes.get('configuration', {}) options['parser'] = parser_name - options['service'] = payload.service + schema = attributes['schema'] - # Setup the parser + # Setup the parser class parser_class = get_parser(parser_name) parser = parser_class(data, schema, options) options['nested_keys'] = parser.__dict__.get('nested_keys') # A list of parsed records parsed_data = parser.parse() - # Used for short circuiting parser determination + # Set the payload type to short circuit parser determination if parser.payload_type: payload.type = parser.payload_type @@ -259,8 +257,10 @@ def _parse(self, payload, data): logger.debug('parsed_data: %s', parsed_data) typed_data = [] for data in parsed_data: - # convert data types per the schema - typed_data.append(self._convert_type(data, schema, options)) + # Convert data types per the schema + # Use the parser.schema due to updates caused by + # configuration settings such as envelope and optional_keys + typed_data.append(self._convert_type(data, parser.schema, options)) if typed_data: payload.log_source = log_name @@ -296,14 +296,14 @@ def _convert_type(self, parsed_data, schema, options): elif value == 'integer': try: payload[key] = int(payload[key]) - except ValueError as e: + except ValueError: logger.error('Invalid schema - %s is not an int', key) return False elif value == 'float': try: payload[key] = float(payload[key]) - except ValueError as e: + except ValueError: logger.error('Invalid schema - %s is not a float', key) return False diff --git a/stream_alert/rule_processor/parsers.py b/stream_alert/rule_processor/parsers.py index 6a6b3547f..ddb607bed 100644 --- a/stream_alert/rule_processor/parsers.py +++ b/stream_alert/rule_processor/parsers.py @@ -16,15 +16,17 @@ import csv import json -import jsonpath_rw -import zlib import logging import re import StringIO +import zlib from abc import ABCMeta, abstractmethod +from collections import OrderedDict from fnmatch import fnmatch +import jsonpath_rw + logging.basicConfig() logger = logging.getLogger('StreamAlert') @@ -133,26 +135,55 @@ def _parse_records(self, json_payload): If desired, fields present on the root record can be merged into child events using the `envelope` option. - Args: - json_payload: A dict of the parsed json data - schema: A dict of a log type's schema + json_payload [dict]: The parsed json data + schema [dict]: A log type's schema Returns: - A list of dict JSON payloads + [list] of dictionaries representing JSON payloads """ json_records = [] envelope = {} - hints = self.options.get('hints', {}) - if hints: - records_schema = hints.get('records') - envelope_schema = hints.get('envelope', {}) - - if (hints and len(hints) and records_schema): + # Check configuration options + config_options = self.options.get('configuration') + if config_options: + records_schema = config_options.get('json_path') + envelope_schema = config_options.get('envelope_keys', {}) + optional_keys = config_options.get('optional_top_level_keys') + + # Handle optional keys + if config_options and optional_keys: + # Note: This function exists because dict/OrderedDict cannot + # be keys in a dictionary. + def default_optional_values(key): + """Return a default value for a given schema type""" + if key == 'string': + return str() + elif key == 'integer': + return int() + elif key == 'float': + return float() + elif key == 'boolean': + return bool() + elif key == []: + return list() + elif key == OrderedDict(): + return dict() + + for key_name, value_type in optional_keys.iteritems(): + # Update the schema to ensure the record is valid + self.schema.update({key_name: value_type}) + # If the optional key isn't in our parsed json payload + if key_name not in json_payload: + # Set default value + json_payload[key_name] = default_optional_values(value_type) + + # Handle jsonpath extraction of records + if config_options and records_schema: records_jsonpath = jsonpath_rw.parse(records_schema) if len(envelope_schema): - self.schema.update({"envelope": envelope_schema}) + self.schema.update({'envelope': envelope_schema}) envelope_keys = envelope_schema.keys() envelope_jsonpath = jsonpath_rw.parse("$." + ",".join(envelope_keys)) envelope_matches = [match.value for match in envelope_jsonpath.find(json_payload)] @@ -161,7 +192,7 @@ def _parse_records(self, json_payload): for match in records_jsonpath.find(json_payload): record = match.value if len(envelope): - record.update({"envelope": envelope}) + record.update({'envelope': envelope}) json_records.append(record) else: json_records.append(json_payload) @@ -183,8 +214,8 @@ def parse(self): try: json_payload = json.loads(data) self.payload_type = 'json' - except ValueError as e: - logger.debug('JSON parse failed: %s', str(e)) + except ValueError as err: + logger.debug('JSON parse failed: %s', str(err)) return False json_records = self._parse_records(json_payload) @@ -202,16 +233,14 @@ class GzipJSONParser(JSONParser): def parse(self): """Parse a gzipped string into JSON. - Options: - - hints Returns: - An array of parsed JSON records. - False if the data is not Gzipped JSON or the columns do not match. """ try: - json_payload = zlib.decompress(self.data,47) + json_payload = zlib.decompress(self.data, 47) self.data = json_payload - return super(GzipJSONParser,self).parse() + return super(GzipJSONParser, self).parse() except zlib.error: return False @@ -221,23 +250,25 @@ class CSVParser(ParserBase): __parserid__ = 'csv' __default_delimiter = ',' - def _get_reader(self): + def _get_reader(self, config_options): """Return the CSV reader for the given payload source + Args: + config_options [map]: Map containing parser options such as delimiter + Returns: - CSV reader object if the parse was successful - False if parse was unsuccessful """ data = self.data - service = self.options['service'] - delimiter = self.options['delimiter'] or self.__default_delimiter + delimiter = config_options.get('delimiter') or self.__default_delimiter # TODO(ryandeivert): either subclass a current parser or add a new # parser to support parsing CSV data that contains a header line try: csv_data = StringIO.StringIO(data) reader = csv.reader(csv_data, delimiter=delimiter) - except ValueError, csv.Error: + except (ValueError, csv.Error): return False return reader @@ -254,11 +285,12 @@ def parse(self): """ schema = self.schema hints = self.options.get('hints') + config_options = self.options.get('configuration') hint_result = [] csv_payloads = [] - reader = self._get_reader() + reader = self._get_reader(config_options) if not reader: return False try: @@ -291,7 +323,7 @@ def parse(self): csv_payloads.append(csv_payload) - return csv_payloads + return csv_payloads except csv.Error: return False @@ -315,9 +347,10 @@ def parse(self): data = self.data schema = self.schema options = self.options + config_options = options.get('configuration', {}) - delimiter = options['delimiter'] or self.__default_delimiter - separator = options['separator'] or self.__default_separator + delimiter = config_options.get('delimiter') or self.__default_delimiter + separator = config_options.get('separator') or self.__default_separator kv_payload = {} try: diff --git a/test/scripts/autopep8.sh b/test/scripts/autopep8.sh new file mode 100755 index 000000000..43acb3f79 --- /dev/null +++ b/test/scripts/autopep8.sh @@ -0,0 +1,2 @@ +#! /bin/bash +autopep8 --in-place --aggressive --aggressive $1 \ No newline at end of file diff --git a/test/scripts/unit_tests.sh b/test/scripts/unit_tests.sh index cd168ffba..d59719028 100755 --- a/test/scripts/unit_tests.sh +++ b/test/scripts/unit_tests.sh @@ -1,2 +1,2 @@ #! /bin/bash -nosetests test/unit --with-coverage --cover-package=stream_alert \ No newline at end of file +nosetests test/unit --with-coverage --cover-package=stream_alert --cover-package=stream_alert_cli --cover-min-percentage=80 \ No newline at end of file diff --git a/test/unit/conf/logs.json b/test/unit/conf/logs.json index 5a25ec2f3..c278cb2fe 100644 --- a/test/unit/conf/logs.json +++ b/test/unit/conf/logs.json @@ -5,6 +5,13 @@ "key1": [], "key2": "string", "key3": "integer" + }, + "configuration": { + "optional_top_level_keys": { + "key9": "boolean", + "key10": {}, + "key11": "float" + } } }, "test_log_type_json_2": { @@ -40,6 +47,11 @@ "cluster": "string", "number": "integer" } + }, + "configuration": { + "optional_top_level_keys": { + "log_type": "string" + } } }, "test_log_type_json_nested_with_data": { @@ -59,8 +71,6 @@ }, "test_log_type_kv_auditd": { "parser": "kv", - "delimiter": " ", - "separator": "=", "schema": { "type": "string", "msg": "string", @@ -104,6 +114,10 @@ "ogid": "integer", "rdev": "string", "obj": "string" + }, + "configuration": { + "delimiter": " ", + "separator": "=" } }, "test_log_type_csv": { @@ -169,30 +183,30 @@ "results": "string" }, "parser": "json", - "hints": { - "records": "$.profiles[*].controls[*]" + "configuration": { + "json_path": "$.profiles[*].controls[*]" } }, "test_cloudtrail": { "parser": "json", "schema": { - "eventVersion": "string", - "eventID": "string", - "eventTime": "string", - "requestParameters": {}, - "eventType": "string", - "responseElements": "string", - "awsRegion": "string", - "eventName": "string", - "userIdentity": {}, - "eventSource": "string", - "requestID": "string", - "userAgent": "string", - "sourceIPAddress": "string", - "recipientAccountId": "string" + "eventVersion": "string", + "eventID": "string", + "eventTime": "string", + "requestParameters": {}, + "eventType": "string", + "responseElements": "string", + "awsRegion": "string", + "eventName": "string", + "userIdentity": {}, + "eventSource": "string", + "requestID": "string", + "userAgent": "string", + "sourceIPAddress": "string", + "recipientAccountId": "string" }, - "hints" : { - "records": "Records[*]" + "configuration": { + "json_path": "Records[*]" } }, "test_cloudwatch": { @@ -213,9 +227,9 @@ "flowlogstatus": "string" }, "parser": "gzip-json", - "hints": { - "records": "logEvents[*].extractedFields", - "envelope": { + "configuration": { + "json_path": "logEvents[*].extractedFields", + "envelope_keys": { "logGroup": "string", "logStream": "string", "owner": "integer" diff --git a/test/unit/test_classifier.py b/test/unit/test_classifier.py index 96dbe9d69..49e34c861 100644 --- a/test/unit/test_classifier.py +++ b/test/unit/test_classifier.py @@ -20,14 +20,13 @@ import base64 import json -from collections import OrderedDict - -from nose.tools import assert_equal, assert_not_equal, nottest +from nose.tools import assert_equal, assert_not_equal from stream_alert.rule_processor.classifier import StreamPayload, StreamClassifier from stream_alert.rule_processor.pre_parsers import StreamPreParsers from stream_alert.rule_processor.config import load_config + class TestStreamPayload(object): @classmethod def setup_class(cls): @@ -50,7 +49,7 @@ def make_kinesis_record(**kwargs): raw_record = { 'eventSource': 'aws:kinesis', 'eventSourceARN': 'arn:aws:kinesis:us-east-1:123456789012:stream/{}' - .format(kinesis_stream), + .format(kinesis_stream), 'kinesis': { 'data': base64.b64encode(kinesis_data) } @@ -63,7 +62,7 @@ def payload_generator(self, **kwargs): kinesis_data = kwargs['kinesis_data'] kinesis_record = self.make_kinesis_record(kinesis_stream=kinesis_stream, kinesis_data=kinesis_data) - + payload = StreamPayload(raw_record=kinesis_record) return payload @@ -82,7 +81,6 @@ def teardown(self): """Teardown after each method""" pass - def test_refresh_record(self): """Payload Record Refresh""" kinesis_data = json.dumps({ @@ -100,13 +98,12 @@ def test_refresh_record(self): 'key6': 'key6data' }) second_record = self.make_kinesis_record(kinesis_stream='test_kinesis_stream', - kinesis_data=new_kinesis_data) + kinesis_data=new_kinesis_data) payload.refresh_record(second_record) # check newly loaded record assert_equal(payload.raw_record, second_record) - def test_map_source_1(self): """Payload Source Mapping 1""" data_encoded = base64.b64encode('test_map_source data') @@ -132,7 +129,6 @@ def test_map_source_1(self): assert_equal(payload.entity, 'test_kinesis_stream') assert_equal(set(metadata.keys()), test_kinesis_stream_logs) - def test_map_source_2(self): """Payload Source Mapping 2""" data_encoded = base64.b64encode('test_map_source_data_2') @@ -154,9 +150,8 @@ def test_map_source_2(self): assert_equal(payload.entity, 'test_stream_2') assert_equal(set(metadata.keys()), test_stream_2_logs) - - def test_classify_record_kinesis_json(self): - """Payload Classify JSON""" + def test_classify_record_kinesis_json_optional(self): + """Payload Classify JSON - optional fields""" kinesis_data = json.dumps({ 'key1': [ { @@ -169,7 +164,11 @@ def test_classify_record_kinesis_json(self): } ], 'key2': 'more sample data', - 'key3': '1' + 'key3': '1', + 'key10': { + 'test-field': 1, + 'test-field2': 2 + } }) payload = self.payload_generator(kinesis_stream='test_kinesis_stream', kinesis_data=kinesis_data) @@ -191,11 +190,18 @@ def test_classify_record_kinesis_json(self): assert_equal(payload.type, 'json') assert_not_equal(payload.type, 'csv') - # record type test - assert_equal(type(payload.records[0]['key1']), list) + # record value tests assert_equal(len(payload.records[0]['key1']), 2) + assert_equal(payload.records[0]['key3'], 1) assert_equal(payload.records[0]['key1'][1]['test4'], 4) + # optional field tests + assert_equal(payload.records[0]['key11'], 0.0) + assert_equal(payload.records[0]['key9'], False) + assert_equal(len(payload.records[0]['key10']), 2) + + # record type tests + assert_equal(type(payload.records[0]['key1']), list) assert_equal(type(payload.records[0]['key2']), str) assert_equal(type(payload.records[0]['key3']), int) @@ -233,7 +239,6 @@ def test_classify_record_kinesis_json(self): assert_equal(payload.records[0]['key6'], 10) assert_equal(payload.records[0]['key7'], False) - def test_classify_record_kinesis_nested_json(self): """Payload Classify Nested JSON""" kinesis_data = json.dumps({ @@ -273,7 +278,6 @@ def test_classify_record_kinesis_nested_json(self): assert_equal(payload.records[0]['date'], 'Jan 01 2017') assert_equal(payload.records[0]['data']['key1'], 'test') - def test_classify_record_kinesis_nested_json_osquery(self): """Payload Classify JSON osquery""" kinesis_data = json.dumps({ @@ -324,7 +328,7 @@ def test_classify_record_kinesis_nested_json_osquery(self): assert_equal(payload.records[0]['columns']['key1'], 'test') assert_equal(payload.records[0]['decorations']['cluster'], 'eu-east') assert_equal(payload.records[0]['decorations']['number'], 100) - + assert_equal(payload.records[0]['log_type'], '') def test_classify_record_kinesis_nested_json_missing_subkey_fields(self): """Payload Classify Nested JSON Missing Subkeys""" @@ -346,7 +350,7 @@ def test_classify_record_kinesis_nested_json_missing_subkey_fields(self): } }) payload = self.payload_generator(kinesis_stream='test_stream_2', - kinesis_data=kinesis_data) + kinesis_data=kinesis_data) classifier = StreamClassifier(config=self.config) classifier.map_source(payload) @@ -358,7 +362,6 @@ def test_classify_record_kinesis_nested_json_missing_subkey_fields(self): assert_equal(payload.valid, False) assert_equal(payload.records, None) - def test_classify_record_kinesis_nested_json_with_data(self): """Payload Classify Nested JSON Generic""" kinesis_data = json.dumps({ @@ -374,7 +377,7 @@ def test_classify_record_kinesis_nested_json_with_data(self): } }) payload = self.payload_generator(kinesis_stream='test_kinesis_stream', - kinesis_data=kinesis_data) + kinesis_data=kinesis_data) classifier = StreamClassifier(config=self.config) classifier.map_source(payload) @@ -385,7 +388,7 @@ def test_classify_record_kinesis_nested_json_with_data(self): # valid record test assert_equal(payload.valid, True) assert_equal(type(payload.records[0]), dict) - + # log type test assert_equal(payload.log_source, 'test_log_type_json_nested_with_data') @@ -404,12 +407,11 @@ def test_classify_record_kinesis_nested_json_with_data(self): assert_equal(payload.records[0]['date'], 'Jan 01 2017') assert_equal(payload.records[0]['data']['source'], 'dev-app-1') - def test_classify_record_kinesis_csv(self): """Payload Classify CSV""" csv_data = 'jan102017,0100,host1,thisis some data with keyword1 in it' payload = self.payload_generator(kinesis_stream='test_kinesis_stream', - kinesis_data=csv_data) + kinesis_data=csv_data) classifier = StreamClassifier(config=self.config) classifier.map_source(payload) @@ -433,7 +435,6 @@ def test_classify_record_kinesis_csv(self): # log source test assert_equal(payload.log_source, 'test_log_type_csv') - def test_classify_record_kinesis_csv_nested(self): """Payload Classify Nested CSV""" csv_nested_data = ( @@ -467,7 +468,6 @@ def test_classify_record_kinesis_csv_nested(self): # log source test assert_equal(payload.log_source, 'test_log_type_csv_nested') - def test_classify_record_kinesis_kv(self): """Payload Classify KV""" auditd_test_data = ( @@ -508,7 +508,6 @@ def test_classify_record_kinesis_kv(self): assert_not_equal(payload.type, 'csv') assert_not_equal(payload.type, 'json') - def test_classify_record_syslog(self): """Payload Classify Syslog""" test_data_1 = ( @@ -548,7 +547,9 @@ def test_classify_record_syslog(self): if name == 'test_1': assert_equal(payload.records[0]['host'], 'vagrant-ubuntu-trusty-64') assert_equal(payload.records[0]['application'], 'sudo') - assert_equal(payload.records[0]['message'], 'pam_unix(sudo:session): session opened for user root by (uid=0)') + assert_equal(payload.records[0]['message'], 'pam_unix(sudo:session):' + ' session opened for user' + ' root by (uid=0)') elif name == 'test_2': assert_equal(payload.records[0]['host'], 'macbook004154test') assert_equal(payload.records[0]['application'], 'authd') diff --git a/test/unit/test_gzip_json_parser.py b/test/unit/test_gzip_json_parser.py index 9d5733730..402dee71e 100644 --- a/test/unit/test_gzip_json_parser.py +++ b/test/unit/test_gzip_json_parser.py @@ -44,13 +44,27 @@ def parser_helper(self, **kwargs): def test_cloudwatch(self): """Parse CloudWatch JSON""" schema = self.config['logs']['test_cloudwatch']['schema'] - options = { "hints": self.config['logs']['test_cloudwatch']['hints']} + options = { + 'configuration': self.config['logs']['test_cloudwatch']['configuration'] + } + with open('test/unit/fixtures/cloudwatch.json','r') as fixture_file: data = fixture_file.readlines() data_record = zlib.compress(data[0].strip()) - parsed_result = self.parser_helper(data=data_record, schema=schema, options=options) + + parsed_result = self.parser_helper(data=data_record, + schema=schema, + options=options) + assert_not_equal(parsed_result, False) assert_equal(80,len(parsed_result)) + + expected_keys = (u'protocol', u'source', u'destination', u'srcport', + u'destport', u'eni', u'action', u'packets', u'bytes', + u'windowstart', u'windowend', u'version', u'account', + u'flowlogstatus',u'envelope') + expected_envelope_keys = (u'logGroup', u'logStream', u'owner') + for result in parsed_result: - assert_equal(sorted((u'protocol', u'source', u'destination', u'srcport', u'destport', u'eni', u'action', u'packets', u'bytes', u'windowstart', u'windowend', u'version', u'account', u'flowlogstatus',u'envelope')), sorted(result.keys())) - assert_equal(sorted((u"logGroup",u"logStream",u"owner")),sorted(result['envelope'].keys())) + assert_equal(sorted(expected_keys), sorted(result.keys())) + assert_equal(sorted(expected_envelope_keys),sorted(result['envelope'].keys())) diff --git a/test/unit/test_json_parser.py b/test/unit/test_json_parser.py index 8e581bc79..06e538349 100644 --- a/test/unit/test_json_parser.py +++ b/test/unit/test_json_parser.py @@ -1,15 +1,13 @@ from stream_alert.rule_processor.config import load_config from stream_alert.rule_processor.parsers import get_parser -import zlib + +import json from nose.tools import ( - assert_equal, - assert_not_equal, - nottest, - assert_raises, - raises + assert_equal, ) + class TestJSONParser(object): @classmethod def setup_class(cls): @@ -49,11 +47,18 @@ def test_multi_nested_json(self): 'result': 'string' } options = { - "hints": { - "records": "profiles.controls[*]" + 'configuration': { + 'json_path': 'profiles.controls[*]' } } - data = '{"profiles": {"controls": [{"name": "infra-test-1", "result": "fail"}]}}' + data = json.dumps({ + 'profiles': { + 'controls': [{ + 'name': 'test-infra-1', + 'result': 'fail' + }] + } + }) # get parsed data parsed_data = self.parser_helper(data=data, schema=schema, options=options) @@ -64,29 +69,40 @@ def test_multi_nested_json(self): def test_inspec(self): """Parse Inspec JSON""" schema = self.config['logs']['test_inspec']['schema'] - options = { "hints" : self.config['logs']['test_inspec']['hints'] } + options = { + 'configuration': self.config['logs']['test_inspec']['configuration'] + } # load fixture file with open('test/unit/fixtures/inspec.json', 'r') as fixture_file: data = fixture_file.readlines() data_record = data[0].strip() # setup json parser - parsed_result = self.parser_helper(data=data_record, schema=schema, options=options) + parsed_result = self.parser_helper(data=data_record, + schema=schema, + options=options) + assert_equal(len(parsed_result), 2) - assert_equal(sorted((u'impact', u'code', u'tags', u'source_location', u'refs', u'title', - u'results', u'id', u'desc')),sorted(parsed_result[0].keys())) + inspec_keys = (u'impact', u'code', u'tags', u'source_location', u'refs', + u'title', u'results', u'id', u'desc') + assert_equal(sorted((inspec_keys)), sorted(parsed_result[0].keys())) def test_cloudtrail(self): """Parse Cloudtrail JSON""" schema = self.config['logs']['test_cloudtrail']['schema'] - options = { "hints" : self.config['logs']['test_cloudtrail']['hints'] } + options = { + 'configuration': self.config['logs']['test_cloudtrail']['configuration'] + } # load fixture file with open('test/unit/fixtures/cloudtrail.json', 'r') as fixture_file: data = fixture_file.readlines() data_record = data[0].strip() # setup json parser - parsed_result = self.parser_helper(data=data_record, schema=schema, options=options) + parsed_result = self.parser_helper(data=data_record, + schema=schema, + options=options) + assert_equal(len(parsed_result), 2) assert_equal(len(parsed_result[0].keys()), 14) assert_equal(len(parsed_result[1].keys()), 14) @@ -122,3 +138,41 @@ def test_basic_json(self): assert_equal(set(parsed_data[0].keys()), {'name', 'age', 'city', 'state'}) assert_equal(parsed_data[0]['name'], 'john') assert_equal(type(parsed_data[0]['age']), int) + + def test_optional_keys_json(self): + """Parse JSON with optional top level keys""" + schema = { + 'name': 'string', + 'host': 'string', + 'columns': {} + } + options = { + 'configuration': { + 'optional_top_level_keys': { + 'ids': [], + 'results': {}, + 'host-id': 'integer', + 'valid': 'boolean' + } + } + } + data = json.dumps({ + 'name': 'unit-test', + 'host': 'unit-test-host-1', + 'columns': { + 'test-column': 1 + }, + 'valid': 'true' + }) + parsed_result = self.parser_helper(data=data, + schema=schema, + options=options) + + # tests + assert_equal(parsed_result[0]['host'], 'unit-test-host-1') + assert_equal(parsed_result[0]['valid'], 'true') + + # test optional fields + assert_equal(parsed_result[0]['host-id'], 0) + assert_equal(parsed_result[0]['ids'], []) + assert_equal(parsed_result[0]['results'], {}) diff --git a/test/unit/test_kv_parser.py b/test/unit/test_kv_parser.py index 42f31d9fe..d6c1584e7 100644 --- a/test/unit/test_kv_parser.py +++ b/test/unit/test_kv_parser.py @@ -39,9 +39,10 @@ def test_kv_parsing(self): 'result': 'string' } options = { - 'separator': ':', - 'delimiter': ',', - 'service': 'kinesis' + 'configuration': { + 'separator': ':', + 'delimiter': ',', + } } data = 'name:joe bob,result:success'