Skip to content

Commit

Permalink
[parsers] add cloudtrail via cloudwatch logs support (#745)
Browse files Browse the repository at this point in the history
* [parsers] adding support for loading embedded json

* [test] tests for embedded json support

* [schemas] schema for cloudtrail log via cloudwatch logs

* adding validation test event for cloudtrail via cloudwatch

* addressing pr feedback
  • Loading branch information
ryandeivert authored May 22, 2018
1 parent c1dd599 commit 1ae5bd2
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 11 deletions.
52 changes: 52 additions & 0 deletions conf/logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,58 @@
},
"parser": "json"
},
"cloudwatch:cloudtrail": {
"schema": {
"additionalEventData": {},
"apiVersion": "string",
"awsRegion": "string",
"errorCode": "string",
"errorMessage": "string",
"eventID": "string",
"eventName": "string",
"eventSource": "string",
"eventTime": "string",
"eventType": "string",
"eventVersion": "string",
"managementEvent": "boolean",
"readOnly": "boolean",
"recipientAccountId": "string",
"requestID": "string",
"requestParameters": {},
"resources": [],
"responseElements": {},
"serviceEventDetails": {},
"sharedEventID": "string",
"sourceIPAddress": "string",
"userAgent": "string",
"userIdentity": {},
"vpcEndpointId": "string"
},
"parser": "json",
"configuration": {
"embedded_json": true,
"envelope_keys": {
"logGroup": "string",
"logStream": "string",
"messageType": "string",
"owner": "string",
"subscriptionFilters": []
},
"json_path": "logEvents[*].message",
"optional_top_level_keys": [
"additionalEventData",
"apiVersion",
"errorCode",
"errorMessage",
"managementEvent",
"readOnly",
"resources",
"serviceEventDetails",
"sharedEventID",
"vpcEndpointId"
]
}
},
"cloudtrail:events": {
"schema": {
"additionalEventData": {},
Expand Down
36 changes: 29 additions & 7 deletions stream_alert/rule_processor/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def _parse_records(self, schema, json_payload):
json_payload (dict): The parsed json data
Returns:
list: A list of JSON recrods extracted via JSONPath.
list: A list of parsed JSON records
"""
# Check options and return the payload if there is nothing special to do
if not self.options:
Expand Down Expand Up @@ -244,8 +244,28 @@ def _parse_records(self, schema, json_payload):
envelope_matches = [match.value for match in envelope_jsonpath.find(json_payload)]
envelope = dict(zip(envelope_keys, envelope_matches))

json_records = self._extract_records(json_payload, envelope)
if json_records is False:
return False

# If the final parsed record is singular
if not json_records:
json_records.append(json_payload)

return json_records

def _extract_records(self, json_payload, envelope):
"""Extract records from the original json payload using the JSON configuration
Args:
json_payload (dict): The parsed json data
Returns:
list: A list of JSON records extracted via JSON path or regex
"""
json_records = []
json_path_expression = self.options.get('json_path')
json_regex_key = self.options.get('json_regex_key')
# Handle jsonpath extraction of records
if json_path_expression:
LOGGER.debug('Parsing records with JSONPath')
Expand All @@ -255,13 +275,19 @@ def _parse_records(self, schema, json_payload):
return False
for match in matches:
record = match.value
embedded_json = self.options.get('embedded_json')
if embedded_json:
try:
record = json.loads(match.value)
except ValueError:
LOGGER.warning('Embedded json is invalid')
continue
if envelope:
record.update({ENVELOPE_KEY: envelope})
json_records.append(record)

# Handle nested json object regex matching
json_regex_key = self.options.get('json_regex_key')
if json_regex_key and json_payload.get(json_regex_key):
elif json_regex_key and json_payload.get(json_regex_key):
LOGGER.debug('Parsing records with JSON Regex Key')
match = self.__regex.search(str(json_payload[json_regex_key]))
if not match:
Expand All @@ -282,10 +308,6 @@ def _parse_records(self, schema, json_payload):

json_records.append(new_record)

# If the final parsed record is singular
if not json_records:
json_records.append(json_payload)

return json_records

@time_me
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/rules/cloudwatch/cloudtrail_via_cloudwatch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"data": {
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "CloudTrail/DefaultLogGroup",
"logStream": "123456789012_CloudTrail_us-east-1",
"subscriptionFilters": [
"FooBarSubscription"
],
"logEvents": [
{
"id": "44056647182143267075860006634052172261824828947338793472",
"timestamp": 1526951139360,
"message": "{\"eventVersion\": \"foo\", \"eventID\": \"bar\", \"eventTime\": \"foo\", \"sharedEventID\": \"bar\", \"additionalEventData\": {}, \"requestParameters\": {}, \"eventType\": \"foo\", \"responseElements\": {}, \"awsRegion\": \"foo\", \"eventName\": \"bar\", \"readOnly\": true, \"userIdentity\": {}, \"eventSource\": \"foo\", \"requestID\": \"bar\", \"userAgent\": \"foo\", \"sourceIPAddress\": \"bar\", \"resources\": \"foo\", \"recipientAccountId\": \"bar\"}"
}
]
},
"description": "CloudTrail logs via CloudWatch logs DATA_MESSAGE (validation only)",
"log": "cloudwatch:cloudtrail",
"service": "kinesis",
"source": "prefix_cluster1_stream_alert_kinesis",
"trigger_rules": [],
"validate_schema_only": true
}
]
15 changes: 15 additions & 0 deletions tests/unit/conf/logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,21 @@
},
"parser": "json"
},
"json:embedded": {
"schema": {
"nested_key_01": "string",
"nested_key_02": "string"
},
"parser": "json",
"configuration": {
"embedded_json": true,
"envelope_keys": {
"env_key_01": "string",
"env_key_02": "string"
},
"json_path": "test_list[*].message"
}
},
"json:regex_key_with_envelope": {
"schema": {
"nested_key_1": "string",
Expand Down
50 changes: 46 additions & 4 deletions tests/unit/stream_alert_rule_processor/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import OrderedDict
import json

from mock import patch
from mock import call, patch
from nose.tools import (
assert_equal,
assert_false,
Expand Down Expand Up @@ -411,15 +411,57 @@ def test_json_regex_key(self):
'"nested_key_2": "more_nested_info",'
'"nested_key_3": "even_more"}'
})
parsed_result = self.parser_helper(data=data,
schema=schema,
options=options)
parsed_result = self.parser_helper(data=data, schema=schema, options=options)

assert_items_equal(parsed_result[0].keys(),
['nested_key_1',
'nested_key_2',
'nested_key_3'])

def test_embedded_json(self):
"""JSON Parser - Embedded JSON"""
schema = self.config['logs']['json:embedded']['schema']
options = self.config['logs']['json:embedded']['configuration']

data = json.dumps({
'env_key_01': 'data',
'env_key_02': 'time',
'test_list': [
{
'id': 'foo',
'message': ('{\"nested_key_01\":\"bar\",'
'\"nested_key_02\":\"baz\"}')
}
]
})

parsed_result = self.parser_helper(data=data, schema=schema, options=options)
expected_keys = {'nested_key_01', 'nested_key_02', 'streamalert:envelope_keys'}
expected_env_keys = {'env_key_01', 'env_key_02'}
assert_items_equal(parsed_result[0].keys(), expected_keys)
assert_items_equal(parsed_result[0]['streamalert:envelope_keys'], expected_env_keys)

@patch('logging.Logger.warning')
def test_embedded_json_invalid(self, log_mock):
"""JSON Parser - Embedded JSON, Invalid"""
schema = self.config['logs']['json:embedded']['schema']
options = self.config['logs']['json:embedded']['configuration']

data = json.dumps({
'env_key_01': 'data',
'env_key_02': 'time',
'test_list': [
{
'id': 'foo',
'message': '{\"invalid_json\"}'
}
]
})

result = self.parser_helper(data=data, schema=schema, options=options)
assert_equal(result, False)
log_mock.assert_has_calls([call('Embedded json is invalid')])

def test_basic_json(self):
"""JSON Parser - Non-nested JSON objects"""
# setup
Expand Down

0 comments on commit 1ae5bd2

Please sign in to comment.