Skip to content

Commit

Permalink
[rule processor] Support data normalization
Browse files Browse the repository at this point in the history
* Add a configuration file conf/types.json
* Add data normalization logic in rule processor
* Add a sample rule for integration test
* Add unit test cases
  • Loading branch information
Chunyong Lin committed Aug 31, 2017
1 parent ec648d1 commit 7093697
Show file tree
Hide file tree
Showing 17 changed files with 572 additions and 11 deletions.
65 changes: 65 additions & 0 deletions conf/types.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{
"carbonblack":{
"username": ["username"],
"domain": ["domain"],
"process_path": ["parent_path", "process_path", "path"],
"protocol": ["protocol"],
"feed_source": ["feed_name"],
"process": ["parent_name", "process_name"],
"filename": ["observed_filename", "file_path"],
"command": ["cmdline"],
"md5_hash": ["process_md5", "parent_md5", "expect_followon_w_md5", "md5"],
"binary_score": ["report_score"],
"os_type": ["host_type", "os_type"],
"ipaddress": ["ipv4", "comms_ip", "interface_ip", "remote_ip", "local_ip"],
"port": ["port", "remote_port", "local_port"],
"src_host": ["other_hostnames", "server_name", "hostname", "computer_name"]
},
"cloudwatch":{
"username": ["userName", "owner", "invokedBy"],
"account": ["account", "recipientAccountId"],
"protocol": ["protocol"],
"event_type": ["eventType"],
"event_name": ["eventName"],
"region": ["region"],
"user_agent": ["userAgent"],
"ipaddress": ["destination", "source", "sourceIPAddress"],
"port": ["srcport", "destport"]
},
"cloudtrail": {
"account": ["account", "recipientAccountId", "accountId"],
"event_type": ["eventType"],
"event_name": ["eventName"],
"region": ["region", "awsRegion"],
"user_type": ["type"],
"user_agent": ["userAgent"],
"ipaddress": ["sourceIPAddress"]
},
"ghe": {
"process": ["program"],
"username": ["current_user"],
"ipaddress": ["remote_address"],
"port": ["port"],
"src_host": ["host"]
},
"osquery": {
"username": ["username", "user"],
"process_path": ["path"],
"protocol": ["protocol"],
"severity": ["severity"],
"cluster": ["envIdentifier"],
"role": ["roleIdentifier"],
"command": ["cmdline", "command"],
"message": ["message"],
"ipaddress": ["destination", "remote_address", "host", "source", "local_address", "gateway", "address"],
"port": ["local_port", "remote_port", "port"],
"src_host": ["hostIdentifier"]
},
"pan": {
"username": ["srcuser", "dstuser"],
"protocol": ["proto"],
"ipaddress": ["src", "natsrc", "dst", "natdst"],
"port": ["dport", "sport", "natsport", "natdport"],
"src_host": ["sourceName"]
}
}
27 changes: 27 additions & 0 deletions helpers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,30 @@ def in_network(ip_address, cidrs):
if ip_address in network:
return True
return False

def fetch_values_by_datatype(rec, datatype):
"""Fetch values of normalized_type.
Args:
rec (dict): parsed payload of any log
datatype (str): normalized type user interested
Returns:
(list) The values of normalized types
"""
results = []
if not datatype in rec['normalized_types'].keys():
return results

for key in rec['normalized_types'][datatype]:
# Normalized type may be in nested subkeys, we only support one level of
# nested subkey.
if isinstance(key, list):
if len(key) == 2:
results.append(rec[key[0]][key[1]])
else:
LOGGER.error('Invalid length of keys: %s, it should be 2', key)
else:
results.append(rec[key])

return results
26 changes: 26 additions & 0 deletions rules/community/cloudtrail/cloudtrail_aws_access_by_evil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Alert on matching IP address from aws access."""
from stream_alert.rule_processor.rules_engine import StreamRules
from helpers.base import fetch_values_by_datatype

rule = StreamRules.rule


@rule(logs=['cloudwatch:events'],
matchers=[],
datatypes=['ipaddress'],
outputs=['aws-s3:sample-bucket',
'pagerduty:sample-integration',
'slack:sample-channel'])
def cloudtrail_aws_access_by_evil(rec):
"""
author: airbnb_csirt
description: This is sample rule to get alert by using normalized type
"ipaddress".
"""

results = fetch_values_by_datatype(rec, 'ipaddress')

for result in results:
if result == '1.1.1.2':
return True
return False
3 changes: 3 additions & 0 deletions stream_alert/rule_processor/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,12 @@ def _parse(self, payload):
schema_match.root_schema):
return False

normalized_types = self._config['types']

payload.log_source = schema_match.log_name
payload.type = schema_match.parser.type()
payload.records = schema_match.parsed_data
payload.normalized_types = normalized_types.get(payload.log_source.split(':')[0])

return True

Expand Down
10 changes: 9 additions & 1 deletion stream_alert/rule_processor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def load_config(conf_dir='conf/'):
key denotes the name of the log type, and includes 'keys' used to match
rules to log fields.
"""
conf_files = ('sources', 'logs')
conf_files = ('sources', 'logs', 'types')
config = dict()
for base_name in conf_files:
path = '{}.json'.format(os.path.join(conf_dir, base_name))
Expand Down Expand Up @@ -88,6 +88,14 @@ def _validate_config(config):
raise ConfigError(
'List of \'logs\' is empty for entity: {}'.format(entity))

# validate supported normalized types
supported_logs = [
'carbonblack', 'cloudwatch', 'cloudtrail', 'ghe', 'osquery', 'pan'
]
for log_type in config['types'].keys():
if log_type not in supported_logs:
raise ConfigError('Log type {} is not supported'.format(log_type))


def load_env(context):
"""Get the current environment for the running Lambda function.
Expand Down
14 changes: 10 additions & 4 deletions stream_alert/rule_processor/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,15 @@ def __init__(self, **kwargs):
self._refresh_record(None)

def __repr__(self):
repr_str = ('<{} valid:{} log_source:{} entity:{} type:{} '
'record:{}>').format(self.__class__.__name__, self.valid,
self.log_source, self.entity,
self.type, self.records)
repr_str = (
'<{} valid:{} log_source:{} entity:{} '
'type:{} record:{} normalized_types:{}>'
).format(
self.__class__.__name__, self.valid,
self.log_source, self.entity,
self.type, self.records,
self.normalized_types
)

return repr_str

Expand Down Expand Up @@ -122,6 +127,7 @@ def _refresh_record(self, new_record):
self.records = None
self.type = None
self.valid = False
self.normalized_types = None


class S3ObjectSizeError(Exception):
Expand Down
83 changes: 83 additions & 0 deletions stream_alert/rule_processor/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
RuleAttributes = namedtuple('Rule', ['rule_name',
'rule_function',
'matchers',
'datatypes',
'logs',
'outputs',
'req_subkeys'])
Expand Down Expand Up @@ -66,6 +67,7 @@ def decorator(rule):
logs = opts.get('logs')
outputs = opts.get('outputs')
matchers = opts.get('matchers')
datatypes = opts.get('datatypes')
req_subkeys = opts.get('req_subkeys')

if not logs:
Expand All @@ -85,6 +87,7 @@ def decorator(rule):
cls.__rules[rule_name] = RuleAttributes(rule_name,
rule,
matchers,
datatypes,
logs,
outputs,
req_subkeys)
Expand Down Expand Up @@ -153,6 +156,81 @@ def match_event(cls, record, rule):

return True

@classmethod
def match_types(cls, record, normalized_types, datatypes):
"""Match normalized types against record
Args:
record (dict): Parsed payload of any log
normalized_types (dict): Normalized types
datatypes (list): defined in rule options, normalized_types users
interested in.
Returns:
(dict): A dict of normalized_types with original key names
Example 1:
datatypes=['defined_type1', 'defined_type2', 'not_defined_type']
This method will return an empty dictionary and log datatypes
"not defined" error to Logger.
Example 2:
datatypes=['defined_type1', 'defined_type2']
This method will return an dictionary :
{
"defined_type1": [original_key1],
"defined_type2": [[original_key2, sub_key2], original_key3]
}
"""
results = dict()
if not (datatypes and cls.validate_datatypes(normalized_types, datatypes)):
return results

for key, val in record.iteritems(): # pylint: disable=too-many-nested-blocks
# iterate nested keys if there is
# only support one sub-level nested keys right now
if isinstance(val, dict):
for sub_key, _ in val.iteritems():
for datatype in datatypes:
original_key_names = normalized_types[datatype]
nested_keys = [key]
if sub_key in original_key_names:
nested_keys.append(sub_key)
if not datatype in results.keys():
results[datatype] = [nested_keys]
else:
results[datatype].append(nested_keys)
else:
for datatype in datatypes:
original_key_names = normalized_types[datatype]
if key in original_key_names:
if not datatype in results.keys():
results[datatype] = [key]
else:
results[datatype].append(key)
return results

@classmethod
def validate_datatypes(cls, normalized_types, datatypes):
"""validate if datatypes valid in normalized_types for certain log
Args:
normalized_types (dict): normalized_types for certain log
datatypes (list): defined in rule options, users interested types
Returns:
(boolean): return true if all datatypes are defined
"""
if not normalized_types:
LOGGER.error('Normalized_types is empty.')
return False

for datatype in datatypes:
if not datatype in normalized_types.keys():
LOGGER.error('The datatype [%s] is not defined!', datatype)
return False
return True

@classmethod
def process_rule(cls, record, rule):
"""Process rule functions on a given record
Expand Down Expand Up @@ -250,6 +328,11 @@ def process(cls, input_payload):
if not matcher_result:
continue

types_result = cls.match_types(record,
payload.normalized_types,
rule.datatypes)
record.update({'normalized_types': types_result})

# rule analysis
rule_result = cls.process_rule(record, rule)
if rule_result:
Expand Down
Loading

0 comments on commit 7093697

Please sign in to comment.