Skip to content

Commit

Permalink
Merge pull request #282 from airbnb/ryandeivert-metric-filters
Browse files Browse the repository at this point in the history
[metrics] v2 of metrics support using metric filters
  • Loading branch information
ryandeivert authored Aug 31, 2017
2 parents 8710b15 + 7907cd0 commit ec648d1
Show file tree
Hide file tree
Showing 31 changed files with 441 additions and 295 deletions.
6 changes: 3 additions & 3 deletions conf/global.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
"region": "us-east-1"
},
"infrastructure": {
"metrics": {
"enabled": true
},
"monitoring": {
"create_sns_topic": true
},
"metrics": {
"enabled": false
}
},
"terraform": {
Expand Down
21 changes: 21 additions & 0 deletions stream_alert/alert_processor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Initialize logging for the alert processor."""
import logging
import os

from stream_alert.shared import ALERT_PROCESSOR_NAME as FUNCTION_NAME

# Create a package level logger to import
LEVEL = os.environ.get('LOGGER_LEVEL', 'INFO').upper()

# Cast integer levels to avoid a ValueError
if LEVEL.isdigit():
LEVEL = int(LEVEL)

logging.basicConfig(format='%(name)s [%(levelname)s]: [%(module)s.%(funcName)s] %(message)s')

LOGGER = logging.getLogger('StreamAlertOutput')
try:
LOGGER.setLevel(LEVEL)
except (TypeError, ValueError) as err:
LOGGER.setLevel('INFO')
LOGGER.error('Defaulting to INFO logging: %s', err)
6 changes: 1 addition & 5 deletions stream_alert/alert_processor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging

logging.basicConfig()
LOGGER = logging.getLogger('StreamAlertOutput')
LOGGER.setLevel(logging.DEBUG)
from stream_alert.alert_processor import LOGGER


def validate_alert(alert):
Expand Down
6 changes: 1 addition & 5 deletions stream_alert/alert_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@
"""
from collections import OrderedDict
import json
import logging

from stream_alert.alert_processor import LOGGER
from stream_alert.alert_processor.helpers import validate_alert
from stream_alert.alert_processor.outputs import get_output_dispatcher

logging.basicConfig()
LOGGER = logging.getLogger('StreamAlertOutput')
LOGGER.setLevel(logging.DEBUG)


def handler(event, context):
"""StreamAlert Alert Processor
Expand Down
6 changes: 1 addition & 5 deletions stream_alert/alert_processor/output_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from abc import ABCMeta, abstractmethod
from collections import namedtuple
import json
import logging
import os
import ssl
import tempfile
Expand All @@ -25,8 +24,7 @@
import boto3
from botocore.exceptions import ClientError

logging.basicConfig()
LOGGER = logging.getLogger('StreamAlertOutput')
from stream_alert.alert_processor import LOGGER

OutputProperty = namedtuple('OutputProperty',
'description, value, input_restrictions, mask_input, cred_requirement')
Expand Down Expand Up @@ -274,7 +272,6 @@ def get_user_defined_properties(self):
Returns:
OrderedDict: Contains various OutputProperty items
"""
pass

@abstractmethod
def dispatch(self, **kwargs):
Expand All @@ -287,4 +284,3 @@ def dispatch(self, **kwargs):
rule_name (str): Name of the triggered rule
alert (dict): Alert relevant to the triggered rule
"""
pass
5 changes: 1 addition & 4 deletions stream_alert/alert_processor/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
from collections import OrderedDict
from datetime import datetime
import json
import logging
import os
import uuid

import boto3

from stream_alert.alert_processor import LOGGER
from stream_alert.alert_processor.output_base import OutputProperty, StreamOutputBase

logging.basicConfig()
LOGGER = logging.getLogger('StreamAlertOutput')

# STREAM_OUTPUTS will contain each subclass of the StreamOutputBase
# All included subclasses are designated using the '@output' class decorator
# The keys are the name of the service and the value is the class itself
Expand Down
21 changes: 21 additions & 0 deletions stream_alert/athena_partition_refresh/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Initialize logging for the athena partition refresh function."""
import logging
import os

from stream_alert.shared import ATHENA_PARTITION_REFRESH_NAME as FUNCTION_NAME

# Create a package level logger to import
LEVEL = os.environ.get('LOGGER_LEVEL', 'INFO').upper()

# Cast integer levels to avoid a ValueError
if LEVEL.isdigit():
LEVEL = int(LEVEL)

logging.basicConfig(format='%(name)s [%(levelname)s]: [%(module)s.%(funcName)s] %(message)s')

LOGGER = logging.getLogger('StreamAlertAthena')
try:
LOGGER.setLevel(LEVEL)
except (TypeError, ValueError) as err:
LOGGER.setLevel('INFO')
LOGGER.error('Defaulting to INFO logging: %s', err)
7 changes: 1 addition & 6 deletions stream_alert/athena_partition_refresh/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@
from collections import defaultdict
from datetime import datetime
import json
import logging
import os
import re
import urllib

import backoff
import boto3

logging.basicConfig(
format='%(name)s [%(levelname)s]: [%(module)s.%(funcName)s] %(message)s')
LEVEL = os.environ.get('LOGGER_LEVEL', 'INFO')
LOGGER = logging.getLogger('StreamAlertAthena')
LOGGER.setLevel(LEVEL.upper())
from stream_alert.athena_partition_refresh import LOGGER


def _backoff_handler(details):
Expand Down
2 changes: 2 additions & 0 deletions stream_alert/rule_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import os

from stream_alert.shared import RULE_PROCESSOR_NAME as FUNCTION_NAME

# Create a package level logger to import
LEVEL = os.environ.get('LOGGER_LEVEL', 'INFO').upper()

Expand Down
26 changes: 8 additions & 18 deletions stream_alert/rule_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
from logging import DEBUG as LOG_LEVEL_DEBUG
import json

from stream_alert.rule_processor import LOGGER
from stream_alert.rule_processor import FUNCTION_NAME, LOGGER
from stream_alert.rule_processor.classifier import StreamClassifier
from stream_alert.rule_processor.config import load_config, load_env
from stream_alert.rule_processor.payload import load_stream_payload
from stream_alert.rule_processor.rules_engine import StreamRules
from stream_alert.rule_processor.sink import StreamSink
from stream_alert.shared.metrics import Metrics
from stream_alert.shared.metrics import MetricLogger


class StreamAlert(object):
Expand Down Expand Up @@ -51,7 +51,6 @@ def __init__(self, context, enable_alert_processor=True):
# Instantiate a classifier that is used for this run
self.classifier = StreamClassifier(config=config)

self.metrics = Metrics('RuleProcessor', self.env['lambda_region'])
self.enable_alert_processor = enable_alert_processor
self._failed_record_count = 0
self._alerts = []
Expand All @@ -76,10 +75,7 @@ def run(self, event):
if not records:
return False

self.metrics.add_metric(
Metrics.Name.TOTAL_RECORDS,
len(records),
Metrics.Unit.COUNT)
MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.TOTAL_RECORDS, len(records))

for raw_record in records:
# Get the service and entity from the payload. If the service/entity
Expand All @@ -101,33 +97,27 @@ def run(self, event):
continue

# Create the StreamPayload to use for encapsulating parsed info
payload = load_stream_payload(service, entity, raw_record, self.metrics)
payload = load_stream_payload(service, entity, raw_record)
if not payload:
continue

self._process_alerts(payload)

LOGGER.debug('Invalid record count: %d', self._failed_record_count)

self.metrics.add_metric(
Metrics.Name.FAILED_PARSES,
self._failed_record_count,
Metrics.Unit.COUNT)
MetricLogger.log_metric(FUNCTION_NAME,
MetricLogger.FAILED_PARSES,
self._failed_record_count)

LOGGER.debug('%s alerts triggered', len(self._alerts))

self.metrics.add_metric(
Metrics.Name.TRIGGERED_ALERTS, len(
self._alerts), Metrics.Unit.COUNT)
MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.TRIGGERED_ALERTS, len(self._alerts))

# Check if debugging logging is on before json dumping alerts since
# this can be time consuming if there are a lot of alerts
if self._alerts and LOGGER.isEnabledFor(LOG_LEVEL_DEBUG):
LOGGER.debug('Alerts:\n%s', json.dumps(self._alerts, indent=2))

# Send any cached metrics to CloudWatch before returning
self.metrics.send_metrics()

return self._failed_record_count == 0

def get_alerts(self):
Expand Down
17 changes: 7 additions & 10 deletions stream_alert/rule_processor/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@

import boto3

from stream_alert.rule_processor import LOGGER
from stream_alert.shared.metrics import Metrics
from stream_alert.rule_processor import FUNCTION_NAME, LOGGER
from stream_alert.shared.metrics import MetricLogger


def load_stream_payload(service, entity, raw_record, metrics):
def load_stream_payload(service, entity, raw_record):
"""Returns the right StreamPayload subclass for this service
Args:
service (str): service name to load class for
entity (str): entity for this service
raw_record (str): record raw payload data
metrics (Metrics): payload metrics
"""
payload_map = {'s3': S3Payload,
'sns': SnsPayload,
Expand All @@ -46,7 +45,7 @@ def load_stream_payload(service, entity, raw_record, metrics):
LOGGER.error('Service payload not supported: %s', service)
return

return payload_map[service](raw_record=raw_record, entity=entity, metrics=metrics)
return payload_map[service](raw_record=raw_record, entity=entity)


class StreamPayload(object):
Expand Down Expand Up @@ -76,7 +75,6 @@ def __init__(self, **kwargs):
"""
self.raw_record = kwargs['raw_record']
self.entity = kwargs['entity']
self.metrics = kwargs['metrics']
self.pre_parsed_record = None

self._refresh_record(None)
Expand Down Expand Up @@ -177,7 +175,7 @@ def pre_parse(self):
avg_record_size,
self.s3_object_size)

self.metrics.add_metric(Metrics.Name.TOTAL_S3_RECORDS, line_num, Metrics.Unit.COUNT)
MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.TOTAL_S3_RECORDS, line_num)

def _download_object(self, region, bucket, key):
"""Download an object from S3.
Expand Down Expand Up @@ -216,9 +214,8 @@ def _download_object(self, region, bucket, key):
total_time = time.time() - start_time
LOGGER.info('Completed download in %s seconds', round(total_time, 2))

# Publish a metric on how long this object took to download
self.metrics.add_metric(
Metrics.Name.S3_DOWNLOAD_TIME, total_time, Metrics.Unit.SECONDS)
# Log a metric on how long this object took to download
MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.S3_DOWNLOAD_TIME, total_time)

return downloaded_s3_object

Expand Down
7 changes: 6 additions & 1 deletion stream_alert/shared/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""Define logger for shared functionality."""
"""Define some shared resources."""
import logging
import os


ALERT_PROCESSOR_NAME = 'alert_processor'
ATHENA_PARTITION_REFRESH_NAME = 'athena_partition_refresh'
RULE_PROCESSOR_NAME = 'rule_processor'

# Create a package level logger to import
LEVEL = os.environ.get('LOGGER_LEVEL', 'INFO').upper()

Expand Down
Loading

0 comments on commit ec648d1

Please sign in to comment.