Skip to content

Commit

Permalink
Merge pull request #548 from airbnb/ryandeivert-more-accurate-record-…
Browse files Browse the repository at this point in the history
…count-metric

[metrics] getting a more accurate processed record count in rp handler
  • Loading branch information
ryandeivert authored Jan 3, 2018
2 parents e0f147e + b5fe094 commit 53d4207
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
16 changes: 11 additions & 5 deletions stream_alert/rule_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self, context, enable_alert_processor=True):

self.enable_alert_processor = enable_alert_processor
self._failed_record_count = 0
self._processed_record_count = 0
self._processed_size = 0
self._alerts = []

Expand All @@ -82,14 +83,11 @@ def run(self, event):
bool: True if all logs being parsed match a schema
"""
records = event.get('Records', [])
LOGGER.debug('Number of Records: %d', len(records))
LOGGER.debug('Number of incoming records: %d', len(records))
if not records:
return False

MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.TOTAL_RECORDS, len(records))

firehose_config = self.config['global'].get(
'infrastructure', {}).get('firehose', {})
firehose_config = self.config['global'].get('infrastructure', {}).get('firehose', {})
if firehose_config.get('enabled'):
self._firehose_client = StreamAlertFirehose(self.env['lambda_region'],
firehose_config,
Expand Down Expand Up @@ -129,6 +127,10 @@ def run(self, event):
if record_alerts and self.enable_alert_processor:
self.sinker.sink(record_alerts)

MetricLogger.log_metric(FUNCTION_NAME,
MetricLogger.TOTAL_RECORDS,
self._processed_record_count)

MetricLogger.log_metric(FUNCTION_NAME,
MetricLogger.TOTAL_PROCESSED_SIZE,
self._processed_size)
Expand Down Expand Up @@ -182,6 +184,9 @@ def _process_alerts(self, payload):
self._failed_record_count += 1
continue

# Increment the total processed records to get an accurate assessment of throughput
self._processed_record_count += len(record.records)

LOGGER.debug(
'Classified and Parsed Payload: <Valid: %s, Log Source: %s, Entity: %s>',
record.valid,
Expand Down Expand Up @@ -211,4 +216,5 @@ def _process_alerts(self, payload):

if self.enable_alert_processor:
self.sinker.sink(record_alerts)

return payload_with_normalized_records
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def test_check_exists_get_id_fail(self, get_mock):

@patch('requests.get')
def test_check_exists_no_get_id(self, get_mock):
"""Check Exists No Get Id - PagerDutyIncidentOutput"""
"""PagerDutyIncidentOutput - Check Exists No Get Id"""
# /check
get_mock.return_value.status_code = 200
json_check = {'check': [{'id': 'checked_id'}]}
Expand Down
13 changes: 9 additions & 4 deletions tests/unit/stream_alert_rule_processor/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
# pylint: disable=protected-access
# pylint: disable=protected-access,attribute-defined-outside-init
import base64
import json
import logging
Expand Down Expand Up @@ -45,9 +45,6 @@
class TestStreamAlert(object):
"""Test class for StreamAlert class"""

def __init__(self):
self.__sa_handler = None

@patch('stream_alert.rule_processor.handler.load_config',
lambda: load_config('tests/unit/conf/'))
def setup(self):
Expand Down Expand Up @@ -131,6 +128,14 @@ def test_run_with_alert(self, extract_mock, rules_mock):

assert_true(passed)

@patch('stream_alert.rule_processor.handler.StreamClassifier.extract_service_and_entity')
def test_run_alert_count(self, extract_mock):
"""StreamAlert Class - Run, Check Count With 4 Logs"""
count = 4
extract_mock.return_value = ('kinesis', 'unit_test_default_stream')
self.__sa_handler.run(get_valid_event(count))
assert_equal(self.__sa_handler._processed_record_count, count)

@patch('logging.Logger.debug')
@patch('stream_alert.rule_processor.handler.StreamClassifier.extract_service_and_entity')
def test_run_no_alerts(self, extract_mock, log_mock):
Expand Down

0 comments on commit 53d4207

Please sign in to comment.