diff --git a/stream_alert/rule_processor/handler.py b/stream_alert/rule_processor/handler.py index 19f095f16..48a9683b8 100644 --- a/stream_alert/rule_processor/handler.py +++ b/stream_alert/rule_processor/handler.py @@ -56,6 +56,7 @@ def __init__(self, context, enable_alert_processor=True): self.enable_alert_processor = enable_alert_processor self._failed_record_count = 0 + self._processed_record_count = 0 self._processed_size = 0 self._alerts = [] @@ -82,14 +83,11 @@ def run(self, event): bool: True if all logs being parsed match a schema """ records = event.get('Records', []) - LOGGER.debug('Number of Records: %d', len(records)) + LOGGER.debug('Number of incoming records: %d', len(records)) if not records: return False - MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.TOTAL_RECORDS, len(records)) - - firehose_config = self.config['global'].get( - 'infrastructure', {}).get('firehose', {}) + firehose_config = self.config['global'].get('infrastructure', {}).get('firehose', {}) if firehose_config.get('enabled'): self._firehose_client = StreamAlertFirehose(self.env['lambda_region'], firehose_config, @@ -129,6 +127,10 @@ def run(self, event): if record_alerts and self.enable_alert_processor: self.sinker.sink(record_alerts) + MetricLogger.log_metric(FUNCTION_NAME, + MetricLogger.TOTAL_RECORDS, + self._processed_record_count) + MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.TOTAL_PROCESSED_SIZE, self._processed_size) @@ -182,6 +184,9 @@ def _process_alerts(self, payload): self._failed_record_count += 1 continue + # Increment the total processed records to get an accurate assessment of throughput + self._processed_record_count += len(record.records) + LOGGER.debug( 'Classified and Parsed Payload: ', record.valid, @@ -211,4 +216,5 @@ def _process_alerts(self, payload): if self.enable_alert_processor: self.sinker.sink(record_alerts) + return payload_with_normalized_records diff --git a/tests/unit/stream_alert_alert_processor/test_outputs/test_pagerduty.py b/tests/unit/stream_alert_alert_processor/test_outputs/test_pagerduty.py index 9cf64e317..4308c5b94 100644 --- a/tests/unit/stream_alert_alert_processor/test_outputs/test_pagerduty.py +++ b/tests/unit/stream_alert_alert_processor/test_outputs/test_pagerduty.py @@ -199,7 +199,7 @@ def test_check_exists_get_id_fail(self, get_mock): @patch('requests.get') def test_check_exists_no_get_id(self, get_mock): - """Check Exists No Get Id - PagerDutyIncidentOutput""" + """PagerDutyIncidentOutput - Check Exists No Get Id""" # /check get_mock.return_value.status_code = 200 json_check = {'check': [{'id': 'checked_id'}]} diff --git a/tests/unit/stream_alert_rule_processor/test_handler.py b/tests/unit/stream_alert_rule_processor/test_handler.py index e7794e5e9..979a7c76f 100644 --- a/tests/unit/stream_alert_rule_processor/test_handler.py +++ b/tests/unit/stream_alert_rule_processor/test_handler.py @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ -# pylint: disable=protected-access +# pylint: disable=protected-access,attribute-defined-outside-init import base64 import json import logging @@ -45,9 +45,6 @@ class TestStreamAlert(object): """Test class for StreamAlert class""" - def __init__(self): - self.__sa_handler = None - @patch('stream_alert.rule_processor.handler.load_config', lambda: load_config('tests/unit/conf/')) def setup(self): @@ -131,6 +128,14 @@ def test_run_with_alert(self, extract_mock, rules_mock): assert_true(passed) + @patch('stream_alert.rule_processor.handler.StreamClassifier.extract_service_and_entity') + def test_run_alert_count(self, extract_mock): + """StreamAlert Class - Run, Check Count With 4 Logs""" + count = 4 + extract_mock.return_value = ('kinesis', 'unit_test_default_stream') + self.__sa_handler.run(get_valid_event(count)) + assert_equal(self.__sa_handler._processed_record_count, count) + @patch('logging.Logger.debug') @patch('stream_alert.rule_processor.handler.StreamClassifier.extract_service_and_entity') def test_run_no_alerts(self, extract_mock, log_mock):