Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[parsers] add cloudtrail via cloudwatch logs support #745

Merged
merged 5 commits into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions conf/logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,58 @@
},
"parser": "json"
},
"cloudwatch:cloudtrail": {
"schema": {
"additionalEventData": {},
"apiVersion": "string",
"awsRegion": "string",
"errorCode": "string",
"errorMessage": "string",
"eventID": "string",
"eventName": "string",
"eventSource": "string",
"eventTime": "string",
"eventType": "string",
"eventVersion": "string",
"managementEvent": "boolean",
"readOnly": "boolean",
"recipientAccountId": "string",
"requestID": "string",
"requestParameters": {},
"resources": [],
"responseElements": {},
"serviceEventDetails": {},
"sharedEventID": "string",
"sourceIPAddress": "string",
"userAgent": "string",
"userIdentity": {},
"vpcEndpointId": "string"
},
"parser": "json",
"configuration": {
"embedded_json": true,
"envelope_keys": {
"logGroup": "string",
"logStream": "string",
"messageType": "string",
"owner": "string",
"subscriptionFilters": []
},
"json_path": "logEvents[*].message",
"optional_top_level_keys": [
"additionalEventData",
"apiVersion",
"errorCode",
"errorMessage",
"managementEvent",
"readOnly",
"resources",
"serviceEventDetails",
"sharedEventID",
"vpcEndpointId"
]
}
},
"cloudtrail:events": {
"schema": {
"additionalEventData": {},
Expand Down
36 changes: 29 additions & 7 deletions stream_alert/rule_processor/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def _parse_records(self, schema, json_payload):
json_payload (dict): The parsed json data

Returns:
list: A list of JSON recrods extracted via JSONPath.
list: A list of parsed JSON records
"""
# Check options and return the payload if there is nothing special to do
if not self.options:
Expand Down Expand Up @@ -244,8 +244,28 @@ def _parse_records(self, schema, json_payload):
envelope_matches = [match.value for match in envelope_jsonpath.find(json_payload)]
envelope = dict(zip(envelope_keys, envelope_matches))

json_records = self._extract_records(json_payload, envelope)
if json_records is False:
return False

# If the final parsed record is singular
if not json_records:
json_records.append(json_payload)

return json_records

def _extract_records(self, json_payload, envelope):
"""Extract records from the original json payload using the JSON configuration

Args:
json_payload (dict): The parsed json data

Returns:
list: A list of JSON records extracted via JSON path or regex
"""
json_records = []
json_path_expression = self.options.get('json_path')
json_regex_key = self.options.get('json_regex_key')
# Handle jsonpath extraction of records
if json_path_expression:
LOGGER.debug('Parsing records with JSONPath')
Expand All @@ -255,13 +275,19 @@ def _parse_records(self, schema, json_payload):
return False
for match in matches:
record = match.value
embedded_json = self.options.get('embedded_json')
if embedded_json:
try:
record = json.loads(match.value)
except ValueError:
LOGGER.debug('Embedded json is invalid')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a warning? If I understand correctly, this logic parses embedded json (if specified) for each matched log schema. If that fails, then the config is probably wrong? Or will this happen more frequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably safe to have it as warning or error. just updated!

continue
if envelope:
record.update({ENVELOPE_KEY: envelope})
json_records.append(record)

# Handle nested json object regex matching
json_regex_key = self.options.get('json_regex_key')
if json_regex_key and json_payload.get(json_regex_key):
elif json_regex_key and json_payload.get(json_regex_key):
LOGGER.debug('Parsing records with JSON Regex Key')
match = self.__regex.search(str(json_payload[json_regex_key]))
if not match:
Expand All @@ -282,10 +308,6 @@ def _parse_records(self, schema, json_payload):

json_records.append(new_record)

# If the final parsed record is singular
if not json_records:
json_records.append(json_payload)

return json_records

@time_me
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/conf/logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,21 @@
},
"parser": "json"
},
"json:embedded": {
"schema": {
"nested_key_01": "string",
"nested_key_02": "string"
},
"parser": "json",
"configuration": {
"embedded_json": true,
"envelope_keys": {
"env_key_01": "string",
"env_key_02": "string"
},
"json_path": "test_list[*].message"
}
},
"json:regex_key_with_envelope": {
"schema": {
"nested_key_1": "string",
Expand Down
50 changes: 46 additions & 4 deletions tests/unit/stream_alert_rule_processor/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import OrderedDict
import json

from mock import patch
from mock import call, patch
from nose.tools import (
assert_equal,
assert_false,
Expand Down Expand Up @@ -411,15 +411,57 @@ def test_json_regex_key(self):
'"nested_key_2": "more_nested_info",'
'"nested_key_3": "even_more"}'
})
parsed_result = self.parser_helper(data=data,
schema=schema,
options=options)
parsed_result = self.parser_helper(data=data, schema=schema, options=options)

assert_items_equal(parsed_result[0].keys(),
['nested_key_1',
'nested_key_2',
'nested_key_3'])

def test_embedded_json(self):
"""JSON Parser - Embedded JSON"""
schema = self.config['logs']['json:embedded']['schema']
options = self.config['logs']['json:embedded']['configuration']

data = json.dumps({
'env_key_01': 'data',
'env_key_02': 'time',
'test_list': [
{
'id': 'foo',
'message': ('{\"nested_key_01\":\"bar\",'
'\"nested_key_02\":\"baz\"}')
}
]
})

parsed_result = self.parser_helper(data=data, schema=schema, options=options)
expected_keys = {'nested_key_01', 'nested_key_02', 'streamalert:envelope_keys'}
expected_env_keys = {'env_key_01', 'env_key_02'}
assert_items_equal(parsed_result[0].keys(), expected_keys)
assert_items_equal(parsed_result[0]['streamalert:envelope_keys'], expected_env_keys)

@patch('logging.Logger.debug')
def test_embedded_json_invalid(self, log_mock):
"""JSON Parser - Embedded JSON, Invalid"""
schema = self.config['logs']['json:embedded']['schema']
options = self.config['logs']['json:embedded']['configuration']

data = json.dumps({
'env_key_01': 'data',
'env_key_02': 'time',
'test_list': [
{
'id': 'foo',
'message': '{\"invalid_json\"}'
}
]
})

result = self.parser_helper(data=data, schema=schema, options=options)
assert_equal(result, False)
log_mock.assert_has_calls([call('Embedded json is invalid')])

def test_basic_json(self):
"""JSON Parser - Non-nested JSON objects"""
# setup
Expand Down