Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rule processor] Support data normalization #285

Merged
merged 9 commits into from
Sep 7, 2017
70 changes: 70 additions & 0 deletions conf/types.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{
"carbonblack":{
"command": ["cmdline"],
"destinationAddress": ["remote_ip"],
"destinationDomain": ["domain"],
"destinationPort": ["remote_port"],
"deviceAddress": ["interface_ip", "comms_ip"],
"fileHash": ["process_md5", "parent_md5", "expect_followon_w_md5", "md5"],
"fileName": ["observed_filename", "file_path"],
"filePath": ["path"],
"processName": ["parent_name", "process_name"],
"processPath": ["parent_path", "process_path", "path"],
"sourceAddress": ["ipv4", "local_ip"],
"sourcePort": ["port", "local_port"],
"transportProtocol": ["protocol"],
"userName": ["username"]
},
"cloudwatch":{
"destinationAccount": ["recipientAccountId"],
"destinationAddress": ["destination"],
"destinationPort": ["destport"],
"eventName": ["eventName"],
"eventType": ["eventType"],
"region": ["region"],
"sourceAccount": ["account"],
"sourceAddress": ["source", "sourceIPAddress"],
"sourcePort": ["srcport"],
"transportProtocol": ["protocol"],
"userAgent": ["userAgent"],
"userName": ["userName", "owner", "invokedBy"]
},
"cloudtrail": {
"destinationAccount": ["recipientAccountId"],
"eventName": ["eventName"],
"eventType": ["eventType"],
"region": ["region", "awsRegion"],
"sourceAccount": ["account", "accountId"],
"sourceAddress": ["sourceIPAddress"],
"userAgent": ["userAgent"]
},
"ghe": {
"destinationAddress": ["remote_address"],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see an example GHE log with remote_address - lets chat offline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is defined in the GHE schema.

"sourcePort": ["port"],
"userName": ["current_user"]
},
"osquery": {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add "fileHash": ["md5", "sha1", "sha256"] (see https://osquery.io/docs/tables/#hash)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add sourceUserId, have the array contain uid (see https://osquery.io/docs/tables/#users)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add receiptTime, have the array contain unixTime (all osquery logs have this field, it denotes when the info was collected)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add fileSize, have it contain size. ex: https://osquery.io/docs/tables/#file_events

"command": ["cmdline", "command"],
"destinationAddress": ["destination", "remote_address", "gateway"],
"destinationPort": ["remote_port"],
"fileHash": ["md5", "sha1", "sha256"],
"filePath": ["path", "directory"],
"fileSize": ["size"],
"message": ["message"],
"receiptTime": ["unixTime"],
"severity": ["severity"],
"sourceAddress": ["host", "source", "local_address", "address"],
"sourcePort": ["local_port", "port"],
"sourceUserId": ["uid"],
"transportProtocol": ["protocol"],
"userName": ["username", "user"]
},
"pan": {
"destinationAddress": ["dst", "natdst"],
"destinationPort": ["dport", "natdport"],
"sourceAddress": ["src", "natsrc"],
"sourcePort": ["sport", "natsport"],
"transportProtocol": ["proto"],
"userName": ["srcuser", "dstuser"]
}
}
23 changes: 23 additions & 0 deletions helpers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,26 @@ def in_network(ip_address, cidrs):
if ip_address in network:
return True
return False

def fetch_values_by_datatype(rec, datatype):
"""Fetch values of normalized_type.

Args:
rec (dict): parsed payload of any log
datatype (str): normalized type user interested

Returns:
(list) The values of normalized types
"""
results = []
if not datatype in rec['normalized_types']:
return results

for original_keys in rec['normalized_types'][datatype]:
result = rec
if isinstance(original_keys, list):
for original_key in original_keys:
result = result[original_key]
results.append(result)

return results
3 changes: 3 additions & 0 deletions stream_alert/rule_processor/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,12 @@ def _parse(self, payload):
schema_match.root_schema):
return False

normalized_types = self._config['types']

payload.log_source = schema_match.log_name
payload.type = schema_match.parser.type()
payload.records = schema_match.parsed_data
payload.normalized_types = normalized_types.get(payload.log_source.split(':')[0])

return True

Expand Down
3 changes: 1 addition & 2 deletions stream_alert/rule_processor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def load_config(conf_dir='conf/'):
key denotes the name of the log type, and includes 'keys' used to match
rules to log fields.
"""
conf_files = ('sources', 'logs')
conf_files = ('sources', 'logs', 'types')
config = dict()
for base_name in conf_files:
path = '{}.json'.format(os.path.join(conf_dir, base_name))
Expand Down Expand Up @@ -88,7 +88,6 @@ def _validate_config(config):
raise ConfigError(
'List of \'logs\' is empty for entity: {}'.format(entity))


def load_env(context):
"""Get the current environment for the running Lambda function.

Expand Down
14 changes: 10 additions & 4 deletions stream_alert/rule_processor/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,15 @@ def __init__(self, **kwargs):
self._refresh_record(None)

def __repr__(self):
repr_str = ('<{} valid:{} log_source:{} entity:{} type:{} '
'record:{}>').format(self.__class__.__name__, self.valid,
self.log_source, self.entity,
self.type, self.records)
repr_str = (
'<{} valid:{} log_source:{} entity:{} '
'type:{} record:{} normalized_types:{}>'
).format(
self.__class__.__name__, self.valid,
self.log_source, self.entity,
self.type, self.records,
self.normalized_types
)

return repr_str

Expand Down Expand Up @@ -122,6 +127,7 @@ def _refresh_record(self, new_record):
self.records = None
self.type = None
self.valid = False
self.normalized_types = None


class S3ObjectSizeError(Exception):
Expand Down
128 changes: 128 additions & 0 deletions stream_alert/rule_processor/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
RuleAttributes = namedtuple('Rule', ['rule_name',
'rule_function',
'matchers',
'datatypes',
'logs',
'outputs',
'req_subkeys'])
Expand Down Expand Up @@ -66,6 +67,7 @@ def decorator(rule):
logs = opts.get('logs')
outputs = opts.get('outputs')
matchers = opts.get('matchers')
datatypes = opts.get('datatypes')
req_subkeys = opts.get('req_subkeys')

if not logs:
Expand All @@ -85,6 +87,7 @@ def decorator(rule):
cls.__rules[rule_name] = RuleAttributes(rule_name,
rule,
matchers,
datatypes,
logs,
outputs,
req_subkeys)
Expand Down Expand Up @@ -153,6 +156,127 @@ def match_event(cls, record, rule):

return True

@classmethod
def match_types(cls, record, normalized_types, datatypes):
"""Match normalized types against record

Args:
record (dict): Parsed payload of any log
normalized_types (dict): Normalized types
datatypes (list): defined in rule options, normalized_types users
interested in.

Returns:
(dict): A dict of normalized_types with original key names

Example 1:
datatypes=['defined_type1', 'defined_type2', 'not_defined_type']
This method will return an empty dictionary and log datatypes
"not defined" error to Logger.

Example 2:
datatypes=['defined_type1', 'defined_type2']
This method will return an dictionary :
{
"defined_type1": [[original_key1]],
"defined_type2": [[original_key2, sub_key2], [original_key3]]
}
"""
results = dict()
if not (datatypes and cls.validate_datatypes(normalized_types, datatypes)):
return results

return cls.match_types_helper(record, normalized_types, datatypes)

@classmethod
def match_types_helper(cls, record, normalized_types, datatypes):
"""Helper method to recursively visit all subkeys

Args:
record (dict): Parsed data
normalized_types (dict): Normalized types
datatypes (list): normalized types users interested in.

Returns:
(dict): A dict of normalized_types with original key names
"""
results = dict()
for key, val in record.iteritems():
if isinstance(val, dict):
nested_results = cls.match_types_helper(val, normalized_types, datatypes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done! 😸

cls.update(results, key, nested_results)
else:
for datatype in datatypes:
if key in normalized_types[datatype]:
if not datatype in results:
results[datatype] = [[key]]
else:
results[datatype].append([key])
return results

@classmethod
def update(cls, results, parent_key, nested_results):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a description of the args to this function's docstring?

"""Update nested_results by inserting parent key to beginning of list.
Also combine results and nested_results into one dictionary

Example 1:
results = {
'ipv4': [['key1']]
}
parent_key = 'key2'
nested_results = {
'username': [['sub_key1']],
'ipv4': [['sub_key2']]
}

This method will update nested_results to:
{
'username': [['key2', 'sub_key1']],
'ipv4': [['key2', 'sub_key2']]
}

Also it will combine nested_results to results:
{
'ipv4': [['key1'], ['key2', 'sub_key2']],
'username': [['key2', 'sub_key1']]
}
"""
for key, val in nested_results.iteritems():
if isinstance(val, list):
for item in val:
item.insert(0, parent_key)
else:
val.insert(0, parent_key)

if key in results:
results[key] += val
else:
if isinstance(val, list):
results[key] = val
else:
results[key] = [val]

@classmethod
def validate_datatypes(cls, normalized_types, datatypes):
"""Check is datatype valid

Args:
normalized_types (dict): normalized_types for certain log
datatypes (list): defined in rule options, users interested types

Returns:
(boolean): return true if all datatypes are defined
"""
if not normalized_types:
LOGGER.error('Normalized types dictionary is empty.')
return False

for datatype in datatypes:
if not datatype in normalized_types:
LOGGER.error('The datatype [%s] is not defined!', datatype)
return False
return True

@classmethod
def process_rule(cls, record, rule):
"""Process rule functions on a given record
Expand Down Expand Up @@ -250,6 +374,10 @@ def process(cls, input_payload):
if not matcher_result:
continue

types_result = cls.match_types(record,
payload.normalized_types,
rule.datatypes)
record['normalized_types'] = types_result
# rule analysis
rule_result = cls.process_rule(record, rule)
if rule_result:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/rules/cloudtrail_root_account_usage.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"account": 12345,
"region": "123456123456",
"detail-type": "...",
"source": "...",
"source": "1.1.1.2",
"version": "1.05",
"time": "...",
"id": "12345",
Expand Down
14 changes: 13 additions & 1 deletion tests/unit/conf/logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -286,5 +286,17 @@
]
}
}
},
"cloudwatch:test_match_types": {
"schema": {
"account": "integer",
"region": "string",
"detail": {},
"source": "string"
},
"parser": "json",
"configuration": {
"json_path": "logEvents[*].extractedFields"
}
}
}
}
5 changes: 3 additions & 2 deletions tests/unit/conf/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"test_log_type_json_nested_with_data",
"test_log_type_csv",
"test_log_type_csv_nested",
"test_log_type_kv_auditd"
"test_log_type_kv_auditd",
"cloudwatch"
]
},
"test_stream_2": {
Expand All @@ -30,4 +31,4 @@
]
}
}
}
}
15 changes: 15 additions & 0 deletions tests/unit/conf/types.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"cloudwatch":{
"userName": ["userName", "owner", "invokedBy"],
"sourceAccount": ["account", "recipientAccountId"],
"transportProtocol": ["protocol"],
"eventType": ["eventType"],
"eventName": ["eventName"],
"region": ["region"],
"userAgent": ["userAgent"],
"sourceAddress": ["source", "sourceIPAddress"],
"destinationAddress": ["destination"],
"sourcePort": ["srcport"],
"destinationPort": ["destport"]
}
}
8 changes: 8 additions & 0 deletions tests/unit/stream_alert_rule_processor/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ def test_load_env():
assert_equal(env['lambda_function_name'],
'corp-prefix_prod_streamalert_rule_processor')
assert_equal(env['lambda_alias'], 'development')


def test_config_valid_types():
"""Config Validator - valid normalized types"""
# Load a valid config
config = load_config()

_validate_config(config)
Loading