diff --git a/conf/logs.json b/conf/logs.json index 470c8e9db..fd452ec4e 100644 --- a/conf/logs.json +++ b/conf/logs.json @@ -1080,6 +1080,58 @@ }, "parser": "json" }, + "cloudwatch:cloudtrail": { + "schema": { + "additionalEventData": {}, + "apiVersion": "string", + "awsRegion": "string", + "errorCode": "string", + "errorMessage": "string", + "eventID": "string", + "eventName": "string", + "eventSource": "string", + "eventTime": "string", + "eventType": "string", + "eventVersion": "string", + "managementEvent": "boolean", + "readOnly": "boolean", + "recipientAccountId": "string", + "requestID": "string", + "requestParameters": {}, + "resources": [], + "responseElements": {}, + "serviceEventDetails": {}, + "sharedEventID": "string", + "sourceIPAddress": "string", + "userAgent": "string", + "userIdentity": {}, + "vpcEndpointId": "string" + }, + "parser": "json", + "configuration": { + "embedded_json": true, + "envelope_keys": { + "logGroup": "string", + "logStream": "string", + "messageType": "string", + "owner": "string", + "subscriptionFilters": [] + }, + "json_path": "logEvents[*].message", + "optional_top_level_keys": [ + "additionalEventData", + "apiVersion", + "errorCode", + "errorMessage", + "managementEvent", + "readOnly", + "resources", + "serviceEventDetails", + "sharedEventID", + "vpcEndpointId" + ] + } + }, "cloudtrail:events": { "schema": { "additionalEventData": {}, diff --git a/stream_alert/rule_processor/parsers.py b/stream_alert/rule_processor/parsers.py index 779ec9375..2ad3d3088 100644 --- a/stream_alert/rule_processor/parsers.py +++ b/stream_alert/rule_processor/parsers.py @@ -211,7 +211,7 @@ def _parse_records(self, schema, json_payload): json_payload (dict): The parsed json data Returns: - list: A list of JSON recrods extracted via JSONPath. + list: A list of parsed JSON records """ # Check options and return the payload if there is nothing special to do if not self.options: @@ -244,8 +244,28 @@ def _parse_records(self, schema, json_payload): envelope_matches = [match.value for match in envelope_jsonpath.find(json_payload)] envelope = dict(zip(envelope_keys, envelope_matches)) + json_records = self._extract_records(json_payload, envelope) + if json_records is False: + return False + + # If the final parsed record is singular + if not json_records: + json_records.append(json_payload) + + return json_records + + def _extract_records(self, json_payload, envelope): + """Extract records from the original json payload using the JSON configuration + + Args: + json_payload (dict): The parsed json data + + Returns: + list: A list of JSON records extracted via JSON path or regex + """ json_records = [] json_path_expression = self.options.get('json_path') + json_regex_key = self.options.get('json_regex_key') # Handle jsonpath extraction of records if json_path_expression: LOGGER.debug('Parsing records with JSONPath') @@ -255,13 +275,19 @@ def _parse_records(self, schema, json_payload): return False for match in matches: record = match.value + embedded_json = self.options.get('embedded_json') + if embedded_json: + try: + record = json.loads(match.value) + except ValueError: + LOGGER.warning('Embedded json is invalid') + continue if envelope: record.update({ENVELOPE_KEY: envelope}) json_records.append(record) # Handle nested json object regex matching - json_regex_key = self.options.get('json_regex_key') - if json_regex_key and json_payload.get(json_regex_key): + elif json_regex_key and json_payload.get(json_regex_key): LOGGER.debug('Parsing records with JSON Regex Key') match = self.__regex.search(str(json_payload[json_regex_key])) if not match: @@ -282,10 +308,6 @@ def _parse_records(self, schema, json_payload): json_records.append(new_record) - # If the final parsed record is singular - if not json_records: - json_records.append(json_payload) - return json_records @time_me diff --git a/tests/integration/rules/cloudwatch/cloudtrail_via_cloudwatch.json b/tests/integration/rules/cloudwatch/cloudtrail_via_cloudwatch.json new file mode 100644 index 000000000..cf8dbdeaa --- /dev/null +++ b/tests/integration/rules/cloudwatch/cloudtrail_via_cloudwatch.json @@ -0,0 +1,26 @@ +[ + { + "data": { + "messageType": "DATA_MESSAGE", + "owner": "123456789012", + "logGroup": "CloudTrail/DefaultLogGroup", + "logStream": "123456789012_CloudTrail_us-east-1", + "subscriptionFilters": [ + "FooBarSubscription" + ], + "logEvents": [ + { + "id": "44056647182143267075860006634052172261824828947338793472", + "timestamp": 1526951139360, + "message": "{\"eventVersion\": \"foo\", \"eventID\": \"bar\", \"eventTime\": \"foo\", \"sharedEventID\": \"bar\", \"additionalEventData\": {}, \"requestParameters\": {}, \"eventType\": \"foo\", \"responseElements\": {}, \"awsRegion\": \"foo\", \"eventName\": \"bar\", \"readOnly\": true, \"userIdentity\": {}, \"eventSource\": \"foo\", \"requestID\": \"bar\", \"userAgent\": \"foo\", \"sourceIPAddress\": \"bar\", \"resources\": \"foo\", \"recipientAccountId\": \"bar\"}" + } + ] + }, + "description": "CloudTrail logs via CloudWatch logs DATA_MESSAGE (validation only)", + "log": "cloudwatch:cloudtrail", + "service": "kinesis", + "source": "prefix_cluster1_stream_alert_kinesis", + "trigger_rules": [], + "validate_schema_only": true + } +] \ No newline at end of file diff --git a/tests/unit/conf/logs.json b/tests/unit/conf/logs.json index 175e45947..eecd08065 100644 --- a/tests/unit/conf/logs.json +++ b/tests/unit/conf/logs.json @@ -307,6 +307,21 @@ }, "parser": "json" }, + "json:embedded": { + "schema": { + "nested_key_01": "string", + "nested_key_02": "string" + }, + "parser": "json", + "configuration": { + "embedded_json": true, + "envelope_keys": { + "env_key_01": "string", + "env_key_02": "string" + }, + "json_path": "test_list[*].message" + } + }, "json:regex_key_with_envelope": { "schema": { "nested_key_1": "string", diff --git a/tests/unit/stream_alert_rule_processor/test_parsers.py b/tests/unit/stream_alert_rule_processor/test_parsers.py index 144e58100..daef2357b 100644 --- a/tests/unit/stream_alert_rule_processor/test_parsers.py +++ b/tests/unit/stream_alert_rule_processor/test_parsers.py @@ -16,7 +16,7 @@ from collections import OrderedDict import json -from mock import patch +from mock import call, patch from nose.tools import ( assert_equal, assert_false, @@ -411,15 +411,57 @@ def test_json_regex_key(self): '"nested_key_2": "more_nested_info",' '"nested_key_3": "even_more"}' }) - parsed_result = self.parser_helper(data=data, - schema=schema, - options=options) + parsed_result = self.parser_helper(data=data, schema=schema, options=options) assert_items_equal(parsed_result[0].keys(), ['nested_key_1', 'nested_key_2', 'nested_key_3']) + def test_embedded_json(self): + """JSON Parser - Embedded JSON""" + schema = self.config['logs']['json:embedded']['schema'] + options = self.config['logs']['json:embedded']['configuration'] + + data = json.dumps({ + 'env_key_01': 'data', + 'env_key_02': 'time', + 'test_list': [ + { + 'id': 'foo', + 'message': ('{\"nested_key_01\":\"bar\",' + '\"nested_key_02\":\"baz\"}') + } + ] + }) + + parsed_result = self.parser_helper(data=data, schema=schema, options=options) + expected_keys = {'nested_key_01', 'nested_key_02', 'streamalert:envelope_keys'} + expected_env_keys = {'env_key_01', 'env_key_02'} + assert_items_equal(parsed_result[0].keys(), expected_keys) + assert_items_equal(parsed_result[0]['streamalert:envelope_keys'], expected_env_keys) + + @patch('logging.Logger.warning') + def test_embedded_json_invalid(self, log_mock): + """JSON Parser - Embedded JSON, Invalid""" + schema = self.config['logs']['json:embedded']['schema'] + options = self.config['logs']['json:embedded']['configuration'] + + data = json.dumps({ + 'env_key_01': 'data', + 'env_key_02': 'time', + 'test_list': [ + { + 'id': 'foo', + 'message': '{\"invalid_json\"}' + } + ] + }) + + result = self.parser_helper(data=data, schema=schema, options=options) + assert_equal(result, False) + log_mock.assert_has_calls([call('Embedded json is invalid')]) + def test_basic_json(self): """JSON Parser - Non-nested JSON objects""" # setup