Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Normalization] Make logs optional when datatypes is defined #307

Merged
3 commits merged into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions stream_alert/alert_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def _sort_dict(unordered_dict):
"""
result = OrderedDict()
for key, value in sorted(unordered_dict.items(), key=lambda t: t[0]):
if key == 'normalized_types':
continue
if isinstance(value, dict):
result[key] = _sort_dict(value)
continue
Expand Down
6 changes: 2 additions & 4 deletions stream_alert/rule_processor/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ def __init__(self, **kwargs):
def __repr__(self):
repr_str = (
'<{} valid:{} log_source:{} entity:{} '
'type:{} record:{} normalized_types:{}>'
'type:{} record:{}>'
).format(
self.__class__.__name__, self.valid,
self.log_source, self.entity,
self.type, self.records,
self.normalized_types
self.type, self.records
)

return repr_str
Expand Down Expand Up @@ -127,7 +126,6 @@ def _refresh_record(self, new_record):
self.records = None
self.type = None
self.valid = False
self.normalized_types = None


class S3ObjectSizeError(Exception):
Expand Down
21 changes: 11 additions & 10 deletions stream_alert/rule_processor/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ def decorator(rule):
datatypes = opts.get('datatypes')
req_subkeys = opts.get('req_subkeys')

if not logs:
if not (logs or datatypes):
LOGGER.error(
'Invalid rule [%s] - rule must have \'logs\' declared',
'Invalid rule [%s] - rule must have either \'logs\' or \''
'datatypes\' declared',
rule_name)
return

Expand Down Expand Up @@ -202,6 +203,8 @@ def match_types_helper(cls, record, normalized_types, datatypes):
"""
results = dict()
for key, val in record.iteritems():
if key == 'normalized_types':
continue
if isinstance(val, dict):
nested_results = cls.match_types_helper(val, normalized_types, datatypes)
cls.update(results, key, nested_results)
Expand Down Expand Up @@ -274,12 +277,10 @@ def validate_datatypes(cls, normalized_types, datatypes):
(boolean): return true if all datatypes are defined
"""
if not normalized_types:
LOGGER.error('Normalized types dictionary is empty.')
return False

for datatype in datatypes:
if not datatype in normalized_types:
LOGGER.error('The datatype [%s] is not defined!', datatype)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this logging statement no longer helpful? I understand why the one above was removed but can you explain why this one was also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we removed log_sources constrain, one rule will matched to multiple records. Normalized types dictionaries are defined based on each log source though. It is not an error if normalized type foo doesn't defined in Record A, it just doesn't match and return False is enough. Does it make sense to you?

return False
return True

Expand Down Expand Up @@ -362,7 +363,7 @@ def process(cls, input_payload):
payload = copy(input_payload)

rules = [rule_attrs for rule_attrs in cls.__rules.values()
if payload.log_source in rule_attrs.logs]
if rule_attrs.logs is None or payload.log_source in rule_attrs.logs]

if not rules:
LOGGER.debug('No rules to process for %s', payload)
Expand All @@ -379,11 +380,11 @@ def process(cls, input_payload):
matcher_result = cls.match_event(record, rule)
if not matcher_result:
continue

types_result = cls.match_types(record,
payload.normalized_types,
rule.datatypes)
record['normalized_types'] = types_result
if rule.datatypes:
types_result = cls.match_types(record,
payload.normalized_types,
rule.datatypes)
record['normalized_types'] = types_result
# rule analysis
rule_result = cls.process_rule(record, rule)
if rule_result:
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/stream_alert_rule_processor/test_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ def test_repr_string():
s3_payload.records = ['rec1', 'rec2']
print_value = ('<S3Payload valid:False log_source:unit_source '
'entity:entity type:unit_type '
'record:[\'rec1\', \'rec2\'] '
'normalized_types:None>')
'record:[\'rec1\', \'rec2\']>')

output_print = s3_payload.__repr__()
assert_equal(output_print, print_value)
Expand Down
99 changes: 96 additions & 3 deletions tests/unit/stream_alert_rule_processor/test_rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ def test_nest(rec): # pylint: disable=unused-variable
# doing this because after kinesis_data is read in, types are casted per
# the schema
for alert in alerts:
record_keys = alert['record'].keys()
record_keys.remove('normalized_types')
assert_items_equal(record_keys, kinesis_data.keys())
if 'normalized_types' in alert['record'].keys():
alert['record'].remove('normalized_types')
assert_items_equal(alert['record'].keys(), kinesis_data.keys())
assert_items_equal(alert['outputs'], rule_outputs_map[alert['rule_name']])

def test_process_subkeys_nested_records(self):
Expand Down Expand Up @@ -673,3 +673,96 @@ def test_match_types_helper(self):
]
}
assert_equal(results, expected_results)

def test_process_optional_logs(self):
"""Rules Engine - Logs is optional when datatypes presented
"""
@rule(datatypes=['sourceAddress'],
outputs=['s3:sample_bucket'])
def no_logs_has_datatypes(rec): # pylint: disable=unused-variable
"""Testing rule when logs is not present, datatypes is"""
results = fetch_values_by_datatype(rec, 'sourceAddress')

for result in results:
if result == '1.1.1.2':
return True
return False

@rule(logs=['cloudwatch:test_match_types'],
outputs=['s3:sample_bucket'])
def has_logs_no_datatypes(rec): # pylint: disable=unused-variable
"""Testing rule when logs is present, datatypes is not"""

return (
rec['source'] == '1.1.1.2' or
rec['detail']['sourceIPAddress'] == '1.1.1.2'
)

@rule(logs=['cloudwatch:test_match_types'],
datatypes=['sourceAddress'],
outputs=['s3:sample_bucket'])
def has_logs_datatypes(rec): # pylint: disable=unused-variable
"""Testing rule when logs is present, datatypes is"""
results = fetch_values_by_datatype(rec, 'sourceAddress')

for result in results:
if result == '1.1.1.2':
return True
return False

kinesis_data_items = [
{
'account': 123456,
'region': '123456123456',
'source': '1.1.1.2',
'detail': {
'eventName': 'ConsoleLogin',
'sourceIPAddress': '1.1.1.2',
'recipientAccountId': '654321'
}
}
]

alerts = []
for data in kinesis_data_items:
kinesis_data = json.dumps(data)
service, entity = 'kinesis', 'test_kinesis_stream'
raw_record = make_kinesis_raw_record(entity, kinesis_data)
payload = load_and_classify_payload(self.config, service, entity, raw_record)

alerts.extend(StreamRules.process(payload))

assert_equal(len(alerts), 3)
rule_names = ['no_logs_has_datatypes',
'has_logs_no_datatypes',
'has_logs_datatypes'
]
assert_items_equal([alerts[i]['rule_name'] for i in range(3)], rule_names)

def test_process_required_logs(self):
"""Rules Engine - Logs is required when no datatypes defined."""
@rule(outputs=['s3:sample_bucket'])
def match_ipaddress(): # pylint: disable=unused-variable
"""Testing rule to detect matching IP address"""
return True

kinesis_data_items = [
{
'account': 123456,
'region': '123456123456',
'source': '1.1.1.2',
'detail': {
'eventName': 'ConsoleLogin',
'sourceIPAddress': '1.1.1.2',
'recipientAccountId': '654321'
}
}
]

for data in kinesis_data_items:
kinesis_data = json.dumps(data)
service, entity = 'kinesis', 'test_kinesis_stream'
raw_record = make_kinesis_raw_record(entity, kinesis_data)
payload = load_and_classify_payload(self.config, service, entity, raw_record)

assert_false(StreamRules.process(payload))