From 7093697a20dbe2e234d289b3b136cb8736189380 Mon Sep 17 00:00:00 2001 From: Chunyong Lin Date: Mon, 21 Aug 2017 11:30:54 -0700 Subject: [PATCH] [rule processor] Support data normalization * Add a configuration file conf/types.json * Add data normalization logic in rule processor * Add a sample rule for integration test * Add unit test cases --- conf/types.json | 65 ++++++++ helpers/base.py | 27 ++++ .../cloudtrail_aws_access_by_evil.py | 26 +++ stream_alert/rule_processor/classifier.py | 3 + stream_alert/rule_processor/config.py | 10 +- stream_alert/rule_processor/payload.py | 14 +- stream_alert/rule_processor/rules_engine.py | 83 ++++++++++ .../rules/cloudtrail_aws_access_by_evil.json | 153 ++++++++++++++++++ .../rules/cloudtrail_root_account_usage.json | 2 +- tests/unit/conf/logs.json | 14 +- tests/unit/conf/sources.json | 5 +- tests/unit/conf/types.json | 13 ++ .../test_config.py | 15 ++ .../test_helpers.py | 5 + .../test_payload.py | 3 +- .../test_rule_helpers.py | 47 ++++++ .../test_rules_engine.py | 98 ++++++++++- 17 files changed, 572 insertions(+), 11 deletions(-) create mode 100644 conf/types.json create mode 100644 rules/community/cloudtrail/cloudtrail_aws_access_by_evil.py create mode 100644 tests/integration/rules/cloudtrail_aws_access_by_evil.json create mode 100644 tests/unit/conf/types.json diff --git a/conf/types.json b/conf/types.json new file mode 100644 index 000000000..81b45affe --- /dev/null +++ b/conf/types.json @@ -0,0 +1,65 @@ +{ + "carbonblack":{ + "username": ["username"], + "domain": ["domain"], + "process_path": ["parent_path", "process_path", "path"], + "protocol": ["protocol"], + "feed_source": ["feed_name"], + "process": ["parent_name", "process_name"], + "filename": ["observed_filename", "file_path"], + "command": ["cmdline"], + "md5_hash": ["process_md5", "parent_md5", "expect_followon_w_md5", "md5"], + "binary_score": ["report_score"], + "os_type": ["host_type", "os_type"], + "ipaddress": ["ipv4", "comms_ip", "interface_ip", "remote_ip", "local_ip"], + "port": ["port", "remote_port", "local_port"], + "src_host": ["other_hostnames", "server_name", "hostname", "computer_name"] + }, + "cloudwatch":{ + "username": ["userName", "owner", "invokedBy"], + "account": ["account", "recipientAccountId"], + "protocol": ["protocol"], + "event_type": ["eventType"], + "event_name": ["eventName"], + "region": ["region"], + "user_agent": ["userAgent"], + "ipaddress": ["destination", "source", "sourceIPAddress"], + "port": ["srcport", "destport"] + }, + "cloudtrail": { + "account": ["account", "recipientAccountId", "accountId"], + "event_type": ["eventType"], + "event_name": ["eventName"], + "region": ["region", "awsRegion"], + "user_type": ["type"], + "user_agent": ["userAgent"], + "ipaddress": ["sourceIPAddress"] + }, + "ghe": { + "process": ["program"], + "username": ["current_user"], + "ipaddress": ["remote_address"], + "port": ["port"], + "src_host": ["host"] + }, + "osquery": { + "username": ["username", "user"], + "process_path": ["path"], + "protocol": ["protocol"], + "severity": ["severity"], + "cluster": ["envIdentifier"], + "role": ["roleIdentifier"], + "command": ["cmdline", "command"], + "message": ["message"], + "ipaddress": ["destination", "remote_address", "host", "source", "local_address", "gateway", "address"], + "port": ["local_port", "remote_port", "port"], + "src_host": ["hostIdentifier"] + }, + "pan": { + "username": ["srcuser", "dstuser"], + "protocol": ["proto"], + "ipaddress": ["src", "natsrc", "dst", "natdst"], + "port": ["dport", "sport", "natsport", "natdport"], + "src_host": ["sourceName"] + } +} diff --git a/helpers/base.py b/helpers/base.py index 56a5caf99..045b78e20 100644 --- a/helpers/base.py +++ b/helpers/base.py @@ -87,3 +87,30 @@ def in_network(ip_address, cidrs): if ip_address in network: return True return False + +def fetch_values_by_datatype(rec, datatype): + """Fetch values of normalized_type. + + Args: + rec (dict): parsed payload of any log + datatype (str): normalized type user interested + + Returns: + (list) The values of normalized types + """ + results = [] + if not datatype in rec['normalized_types'].keys(): + return results + + for key in rec['normalized_types'][datatype]: + # Normalized type may be in nested subkeys, we only support one level of + # nested subkey. + if isinstance(key, list): + if len(key) == 2: + results.append(rec[key[0]][key[1]]) + else: + LOGGER.error('Invalid length of keys: %s, it should be 2', key) + else: + results.append(rec[key]) + + return results diff --git a/rules/community/cloudtrail/cloudtrail_aws_access_by_evil.py b/rules/community/cloudtrail/cloudtrail_aws_access_by_evil.py new file mode 100644 index 000000000..8f562c07b --- /dev/null +++ b/rules/community/cloudtrail/cloudtrail_aws_access_by_evil.py @@ -0,0 +1,26 @@ +"""Alert on matching IP address from aws access.""" +from stream_alert.rule_processor.rules_engine import StreamRules +from helpers.base import fetch_values_by_datatype + +rule = StreamRules.rule + + +@rule(logs=['cloudwatch:events'], + matchers=[], + datatypes=['ipaddress'], + outputs=['aws-s3:sample-bucket', + 'pagerduty:sample-integration', + 'slack:sample-channel']) +def cloudtrail_aws_access_by_evil(rec): + """ + author: airbnb_csirt + description: This is sample rule to get alert by using normalized type + "ipaddress". + """ + + results = fetch_values_by_datatype(rec, 'ipaddress') + + for result in results: + if result == '1.1.1.2': + return True + return False diff --git a/stream_alert/rule_processor/classifier.py b/stream_alert/rule_processor/classifier.py index 3e35c85d1..d8c28b291 100644 --- a/stream_alert/rule_processor/classifier.py +++ b/stream_alert/rule_processor/classifier.py @@ -276,9 +276,12 @@ def _parse(self, payload): schema_match.root_schema): return False + normalized_types = self._config['types'] + payload.log_source = schema_match.log_name payload.type = schema_match.parser.type() payload.records = schema_match.parsed_data + payload.normalized_types = normalized_types.get(payload.log_source.split(':')[0]) return True diff --git a/stream_alert/rule_processor/config.py b/stream_alert/rule_processor/config.py index 56b6647a8..ce529d7a1 100644 --- a/stream_alert/rule_processor/config.py +++ b/stream_alert/rule_processor/config.py @@ -34,7 +34,7 @@ def load_config(conf_dir='conf/'): key denotes the name of the log type, and includes 'keys' used to match rules to log fields. """ - conf_files = ('sources', 'logs') + conf_files = ('sources', 'logs', 'types') config = dict() for base_name in conf_files: path = '{}.json'.format(os.path.join(conf_dir, base_name)) @@ -88,6 +88,14 @@ def _validate_config(config): raise ConfigError( 'List of \'logs\' is empty for entity: {}'.format(entity)) + # validate supported normalized types + supported_logs = [ + 'carbonblack', 'cloudwatch', 'cloudtrail', 'ghe', 'osquery', 'pan' + ] + for log_type in config['types'].keys(): + if log_type not in supported_logs: + raise ConfigError('Log type {} is not supported'.format(log_type)) + def load_env(context): """Get the current environment for the running Lambda function. diff --git a/stream_alert/rule_processor/payload.py b/stream_alert/rule_processor/payload.py index 1bfb2ae6e..f81d5170f 100644 --- a/stream_alert/rule_processor/payload.py +++ b/stream_alert/rule_processor/payload.py @@ -80,10 +80,15 @@ def __init__(self, **kwargs): self._refresh_record(None) def __repr__(self): - repr_str = ('<{} valid:{} log_source:{} entity:{} type:{} ' - 'record:{}>').format(self.__class__.__name__, self.valid, - self.log_source, self.entity, - self.type, self.records) + repr_str = ( + '<{} valid:{} log_source:{} entity:{} ' + 'type:{} record:{} normalized_types:{}>' + ).format( + self.__class__.__name__, self.valid, + self.log_source, self.entity, + self.type, self.records, + self.normalized_types + ) return repr_str @@ -122,6 +127,7 @@ def _refresh_record(self, new_record): self.records = None self.type = None self.valid = False + self.normalized_types = None class S3ObjectSizeError(Exception): diff --git a/stream_alert/rule_processor/rules_engine.py b/stream_alert/rule_processor/rules_engine.py index 231c4e624..0f0e494e3 100644 --- a/stream_alert/rule_processor/rules_engine.py +++ b/stream_alert/rule_processor/rules_engine.py @@ -24,6 +24,7 @@ RuleAttributes = namedtuple('Rule', ['rule_name', 'rule_function', 'matchers', + 'datatypes', 'logs', 'outputs', 'req_subkeys']) @@ -66,6 +67,7 @@ def decorator(rule): logs = opts.get('logs') outputs = opts.get('outputs') matchers = opts.get('matchers') + datatypes = opts.get('datatypes') req_subkeys = opts.get('req_subkeys') if not logs: @@ -85,6 +87,7 @@ def decorator(rule): cls.__rules[rule_name] = RuleAttributes(rule_name, rule, matchers, + datatypes, logs, outputs, req_subkeys) @@ -153,6 +156,81 @@ def match_event(cls, record, rule): return True + @classmethod + def match_types(cls, record, normalized_types, datatypes): + """Match normalized types against record + + Args: + record (dict): Parsed payload of any log + normalized_types (dict): Normalized types + datatypes (list): defined in rule options, normalized_types users + interested in. + + Returns: + (dict): A dict of normalized_types with original key names + + Example 1: + datatypes=['defined_type1', 'defined_type2', 'not_defined_type'] + This method will return an empty dictionary and log datatypes + "not defined" error to Logger. + + Example 2: + datatypes=['defined_type1', 'defined_type2'] + This method will return an dictionary : + { + "defined_type1": [original_key1], + "defined_type2": [[original_key2, sub_key2], original_key3] + } + """ + results = dict() + if not (datatypes and cls.validate_datatypes(normalized_types, datatypes)): + return results + + for key, val in record.iteritems(): # pylint: disable=too-many-nested-blocks + # iterate nested keys if there is + # only support one sub-level nested keys right now + if isinstance(val, dict): + for sub_key, _ in val.iteritems(): + for datatype in datatypes: + original_key_names = normalized_types[datatype] + nested_keys = [key] + if sub_key in original_key_names: + nested_keys.append(sub_key) + if not datatype in results.keys(): + results[datatype] = [nested_keys] + else: + results[datatype].append(nested_keys) + else: + for datatype in datatypes: + original_key_names = normalized_types[datatype] + if key in original_key_names: + if not datatype in results.keys(): + results[datatype] = [key] + else: + results[datatype].append(key) + return results + + @classmethod + def validate_datatypes(cls, normalized_types, datatypes): + """validate if datatypes valid in normalized_types for certain log + + Args: + normalized_types (dict): normalized_types for certain log + datatypes (list): defined in rule options, users interested types + + Returns: + (boolean): return true if all datatypes are defined + """ + if not normalized_types: + LOGGER.error('Normalized_types is empty.') + return False + + for datatype in datatypes: + if not datatype in normalized_types.keys(): + LOGGER.error('The datatype [%s] is not defined!', datatype) + return False + return True + @classmethod def process_rule(cls, record, rule): """Process rule functions on a given record @@ -250,6 +328,11 @@ def process(cls, input_payload): if not matcher_result: continue + types_result = cls.match_types(record, + payload.normalized_types, + rule.datatypes) + record.update({'normalized_types': types_result}) + # rule analysis rule_result = cls.process_rule(record, rule) if rule_result: diff --git a/tests/integration/rules/cloudtrail_aws_access_by_evil.json b/tests/integration/rules/cloudtrail_aws_access_by_evil.json new file mode 100644 index 000000000..f69a77c5b --- /dev/null +++ b/tests/integration/rules/cloudtrail_aws_access_by_evil.json @@ -0,0 +1,153 @@ +{ + "records": [ + { + "data": { + "account": 12345, + "region": "123456123456", + "detail-type": "...", + "source": "1.1.1.2", + "version": "1.05", + "time": "...", + "id": "12345", + "resources": { + "test": "..." + }, + "detail": { + "eventVersion": "...", + "userIdentity": { + "type": "Root", + "principalId": "12345", + "arn": "arn:aws:iam::12345:root", + "accountId": "12345" + }, + "eventTime": "...", + "eventSource": "...", + "eventName": "ConsoleLogin", + "awsRegion": "...", + "sourceIPAddress": "1.1.1.2", + "userAgent": "...", + "requestParameters": null, + "responseElements": { + "ConsoleLogin": "..." + }, + "additionalEventData": { + "LoginTo": "...", + "MobileVersion": "No", + "MFAUsed": "Yes" + }, + "eventID": "...", + "eventType": "AwsConsoleSignIn", + "recipientAccountId": "12345" + } + }, + "description": "CloudTrail - AWS Access by Evil - True Positive", + "trigger": true, + "source": "prefix_cluster1_stream_alert_kinesis", + "service": "kinesis" + }, + { + "data": { + "account": 12345, + "region": "123456123456", + "detail-type": "...", + "source": "2.2.2.2", + "version": "1.05", + "time": "...", + "id": "12345", + "resources": { + "test": "..." + }, + "detail": { + "eventVersion": "...", + "userIdentity": { + "type": "Root", + "principalId": "...", + "arn": "...", + "accountId": "12345", + "userName": "...", + "sessionContext": { + "attributes": { + "mfaAuthenticated": "true", + "creationDate": "..." + } + }, + "invokedBy": "signin.amazonaws.com" + }, + "eventTime": "...", + "eventSource": "...", + "eventName": "...", + "awsRegion": "...", + "sourceIPAddress": "2.2.2.2", + "userAgent": "...", + "requestParameters": { + "bucketName": "...", + "AccessControlPolicy": { + "AccessControlList": { + "Grant": [ + { + "Grantee": { + "xsi:type": "CanonicalUser", + "DisplayName": "...", + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "ID": "..." + }, + "Permission": "FULL_CONTROL" + }, + { + "Grantee": { + "xsi:type": "Group", + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "URI": "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" + }, + "Permission": "READ" + }, + { + "Grantee": { + "xsi:type": "Group", + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "URI": "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" + }, + "Permission": "READ_ACP" + }, + { + "Grantee": { + "xsi:type": "Group", + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "URI": "http://acs.amazonaws.com/groups/global/AllUsers" + }, + "Permission": "READ" + }, + { + "Grantee": { + "xsi:type": "Group", + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "URI": "http://acs.amazonaws.com/groups/global/AllUsers" + }, + "Permission": "READ_ACP" + } + ] + }, + "xmlns": "http://s3.amazonaws.com/doc/2006-03-01/", + "Owner": { + "DisplayName": "...", + "ID": "..." + } + }, + "acl": [ + "" + ] + }, + "responseElements": null, + "requestID": "...", + "eventID": "...", + "eventType": "AwsApiCall", + "recipientAccountId": "12345" + } + }, + "description": "CloudTrail - AWS Access by Evil - False Positive", + "trigger": false, + "source": "prefix_cluster1_stream_alert_kinesis", + "service": "kinesis" + } + ] +} diff --git a/tests/integration/rules/cloudtrail_root_account_usage.json b/tests/integration/rules/cloudtrail_root_account_usage.json index 1b6d6a54c..94719a8d6 100644 --- a/tests/integration/rules/cloudtrail_root_account_usage.json +++ b/tests/integration/rules/cloudtrail_root_account_usage.json @@ -5,7 +5,7 @@ "account": 12345, "region": "123456123456", "detail-type": "...", - "source": "...", + "source": "1.1.1.2", "version": "1.05", "time": "...", "id": "12345", diff --git a/tests/unit/conf/logs.json b/tests/unit/conf/logs.json index 6fbbfd694..a79bc4e6d 100644 --- a/tests/unit/conf/logs.json +++ b/tests/unit/conf/logs.json @@ -286,5 +286,17 @@ ] } } + }, + "cloudwatch:test_match_types": { + "schema": { + "account": "integer", + "region": "string", + "detail": {}, + "source": "string" + }, + "parser": "json", + "configuration": { + "json_path": "logEvents[*].extractedFields" + } } -} \ No newline at end of file +} diff --git a/tests/unit/conf/sources.json b/tests/unit/conf/sources.json index 43b77e45f..be6a270b6 100644 --- a/tests/unit/conf/sources.json +++ b/tests/unit/conf/sources.json @@ -8,7 +8,8 @@ "test_log_type_json_nested_with_data", "test_log_type_csv", "test_log_type_csv_nested", - "test_log_type_kv_auditd" + "test_log_type_kv_auditd", + "cloudwatch" ] }, "test_stream_2": { @@ -30,4 +31,4 @@ ] } } -} \ No newline at end of file +} diff --git a/tests/unit/conf/types.json b/tests/unit/conf/types.json new file mode 100644 index 000000000..e7b68e7b1 --- /dev/null +++ b/tests/unit/conf/types.json @@ -0,0 +1,13 @@ +{ + "cloudwatch":{ + "username": ["userName", "owner", "invokedBy"], + "account": ["account", "recipientAccountId"], + "protocol": ["protocol"], + "event_type": ["eventType"], + "event_name": ["eventName"], + "region": ["region"], + "user_agent": ["userAgent"], + "ipaddress": ["destination", "source", "sourceIPAddress"], + "port": ["srcport", "destport"] + } +} diff --git a/tests/unit/stream_alert_rule_processor/test_config.py b/tests/unit/stream_alert_rule_processor/test_config.py index b662d02a0..f82e185bb 100644 --- a/tests/unit/stream_alert_rule_processor/test_config.py +++ b/tests/unit/stream_alert_rule_processor/test_config.py @@ -104,3 +104,18 @@ def test_load_env(): assert_equal(env['lambda_function_name'], 'corp-prefix_prod_streamalert_rule_processor') assert_equal(env['lambda_alias'], 'development') + +@raises(ConfigError) +def test_config_invalid_types(): + """Config Validator - invalid normalized types""" + # Load a valid config + config = get_valid_config() + + _validate_config(config) + +def test_config_valid_types(): + """Config Validator - valid normalized types""" + # Load a valid config + config = load_config() + + _validate_config(config) diff --git a/tests/unit/stream_alert_rule_processor/test_helpers.py b/tests/unit/stream_alert_rule_processor/test_helpers.py index 53625fc4a..72a5a8b46 100644 --- a/tests/unit/stream_alert_rule_processor/test_helpers.py +++ b/tests/unit/stream_alert_rule_processor/test_helpers.py @@ -65,6 +65,11 @@ def get_valid_config(): ] } } + }, + 'types': { + 'log_type1': { + 'command': ['cmdline', 'commandline'] + } } } diff --git a/tests/unit/stream_alert_rule_processor/test_payload.py b/tests/unit/stream_alert_rule_processor/test_payload.py index aa1a5ca0e..2c4507014 100644 --- a/tests/unit/stream_alert_rule_processor/test_payload.py +++ b/tests/unit/stream_alert_rule_processor/test_payload.py @@ -69,7 +69,8 @@ def test_repr_string(): s3_payload.records = ['rec1', 'rec2'] print_value = ('') + 'record:[\'rec1\', \'rec2\'] ' + 'normalized_types:None>') output_print = s3_payload.__repr__() assert_equal(output_print, print_value) diff --git a/tests/unit/stream_alert_rule_processor/test_rule_helpers.py b/tests/unit/stream_alert_rule_processor/test_rule_helpers.py index 3b24fbcfc..10d39468a 100644 --- a/tests/unit/stream_alert_rule_processor/test_rule_helpers.py +++ b/tests/unit/stream_alert_rule_processor/test_rule_helpers.py @@ -73,3 +73,50 @@ def test_in_network(): ip_not_in_cidr = '10.0.15.24' assert_equal(base.in_network(ip_not_in_cidr, cidrs), False) + +def test_fetch_values_by_datatype(): + """Helpers - Fetch values from a record by normalized type""" + rec = { + u'account': 12345, + u'region': '123456123456', + u'detail': { + u'eventVersion': u'...', + u'eventID': u'...', + u'eventTime': u'...', + u'additionalEventData': { + u'MFAUsed': u'Yes', + u'LoginTo': u'...', + u'MobileVersion': u'No' + }, + u'requestParameters': None, + u'eventType': u'AwsConsoleSignIn', + u'responseElements': { + u'ConsoleLogin': u'...' + }, + u'awsRegion': u'...', + u'eventName': u'ConsoleLogin', + u'userIdentity': { + u'type': u'Root', + u'principalId': u'12345', + u'arn': u'arn:aws:iam::12345:root', + u'accountId': u'12345' + }, + u'eventSource': u'...', + u'userAgent': u'...', + u'sourceIPAddress': u'1.1.1.2', + u'recipientAccountId': u'12345' + }, + u'detail-type': '...', + u'source': '1.1.1.2', + u'version': '1.05', + 'normalized_types': { + 'ipaddress': [[u'detail', u'sourceIPAddress'], u'source'] + }, + u'time': '...', + u'id': '12345', + u'resources': { + u'test': u'...' + } + } + assert_equal(len(base.fetch_values_by_datatype(rec, 'ipaddress')), 2) + assert_equal(len(base.fetch_values_by_datatype(rec, 'command')), 0) diff --git a/tests/unit/stream_alert_rule_processor/test_rules_engine.py b/tests/unit/stream_alert_rule_processor/test_rules_engine.py index eeab2f08c..698943788 100644 --- a/tests/unit/stream_alert_rule_processor/test_rules_engine.py +++ b/tests/unit/stream_alert_rule_processor/test_rules_engine.py @@ -34,6 +34,7 @@ load_and_classify_payload, make_kinesis_raw_record, ) +from helpers.base import fetch_values_by_datatype rule = StreamRules.rule matcher = StreamRules.matcher() @@ -185,7 +186,9 @@ def test_nest(rec): # pylint: disable=unused-variable # doing this because after kinesis_data is read in, types are casted per # the schema for alert in alerts: - assert_items_equal(alert['record'].keys(), kinesis_data.keys()) + record_keys = alert['record'].keys() + record_keys.remove('normalized_types') + assert_items_equal(record_keys, kinesis_data.keys()) assert_items_equal(alert['outputs'], rule_outputs_map[alert['rule_name']]) def test_process_subkeys_nested_records(self): @@ -199,6 +202,7 @@ def cloudtrail_us_east_logs(rec): rule_name='cloudtrail_us_east_logs', rule_function=cloudtrail_us_east_logs, matchers=[], + datatypes=[], logs=['test_log_type_json_nested'], outputs=['s3:sample_bucket'], req_subkeys={'requestParameters': ['program']} @@ -467,3 +471,95 @@ def gid_500(rec): # pylint: disable=unused-variable rule_name_alerts = [x['rule_name'] for x in alerts] assert_items_equal(rule_name_alerts, ['gid_500', 'auditd_bin_cat']) + + def test_match_types(self): + """Rules Engine - Match normalized types against record""" + @rule(logs=['cloudwatch:test_match_types'], + outputs=['s3:sample_bucket'], + datatypes=['ipaddress']) + def match_ipaddress(rec): # pylint: disable=unused-variable + """Testing rule to detect matching IP address""" + results = fetch_values_by_datatype(rec, 'ipaddress') + + for result in results: + if result == '1.1.1.2': + return True + return False + + @rule(logs=['cloudwatch:test_match_types'], + outputs=['s3:sample_bucket'], + datatypes=['ipaddress', 'command']) + def mismatch_types(rec): # pylint: disable=unused-variable + """Testing rule with non-existing normalized type in the record. It + should not trigger alert. + """ + results = fetch_values_by_datatype(rec, 'ipaddress') + + for result in results: + if result == '2.2.2.2': + return True + return False + + kinesis_data_items = [ + { + 'account': 123456, + 'region': '123456123456', + 'source': '1.1.1.2', + 'detail': { + 'eventName': 'ConsoleLogin', + 'sourceIPAddress': '1.1.1.2', + 'recipientAccountId': '654321' + } + }, + { + 'account': 654321, + 'region': '654321654321', + 'source': '2.2.2.2', + 'detail': { + 'eventName': 'ConsoleLogin', + 'sourceIPAddress': '2.2.2.2', + 'recipientAccountId': '123456' + } + } + ] + + # prepare payloads + alerts = [] + for data in kinesis_data_items: + kinesis_data = json.dumps(data) + # prepare the payloads + service, entity = 'kinesis', 'test_kinesis_stream' + raw_record = make_kinesis_raw_record(entity, kinesis_data) + payload = load_and_classify_payload(self.config, service, entity, raw_record) + + alerts.extend(StreamRules.process(payload)) + + # check alert output + assert_equal(len(alerts), 1) + + # alert tests + assert_equal(alerts[0]['rule_name'], 'match_ipaddress') + + def test_validate_datatypes(self): + """Rules Engine - validate datatypes""" + normalized_types, datatypes = None, ['type1'] + assert_equal( + StreamRules.validate_datatypes(normalized_types, datatypes), + False + ) + + normalized_types = { + 'type1': ['key1'], + 'type2': ['key2'] + } + datatypes = ['type1'] + assert_equal( + StreamRules.validate_datatypes(normalized_types, datatypes), + True + ) + + datatypes = ['type1', 'type3'] + assert_equal( + StreamRules.validate_datatypes(normalized_types, datatypes), + False + )