Skip to content

Commit

Permalink
Create shared AlertTable and Alert (with merge algorithm) (#666)
Browse files Browse the repository at this point in the history
  • Loading branch information
austinbyers authored Apr 6, 2018
1 parent 63264bf commit 943e6cc
Show file tree
Hide file tree
Showing 37 changed files with 1,644 additions and 1,054 deletions.
4 changes: 2 additions & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ valid-metaclass-classmethod-first-arg=mcs
max-args=8

# Maximum number of attributes for a class (see R0902).
max-attributes=15
max-attributes=20

# Maximum number of boolean expressions in a if statement
max-bool-expr=5
Expand All @@ -367,7 +367,7 @@ max-bool-expr=5
max-branches=25

# Maximum number of locals for function / method body
max-locals=25
max-locals=30

# Maximum number of parents for a class (see R0901).
max-parents=7
Expand Down
117 changes: 13 additions & 104 deletions stream_alert/alert_merger/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,111 +14,17 @@
limitations under the License.
"""
from __future__ import absolute_import
from decimal import Decimal
import json
import os
import time

from stream_alert.alert_merger import LOGGER
from stream_alert.shared.alert_table import AlertTable
from stream_alert.shared.metrics import ALERT_MERGER_NAME, MetricLogger

import boto3
from boto3.dynamodb.conditions import Attr, Key
from botocore.exceptions import ClientError


class AlertTable(object):
"""Provides convenience methods for accessing and modifying the alerts table."""

def __init__(self, table_name):
self.table = boto3.resource('dynamodb').Table(table_name)

@staticmethod
def _paginate(func, func_kwargs):
"""Paginate results from a scan() or query().
Args:
func (method): Function to invoke (ALERTS_TABLE.scan or ALERTS_TABLE.query)
func_kwargs (dict): Keyword arguments to pass to the scan/query function.
The kwargs will be modified if pagination is necessary.
Yields:
dict: Each item (row) from the response
"""
while True:
response = func(**func_kwargs)
for item in response.get('Items', []):
yield item

if response.get('LastEvaluatedKey'):
func_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
else:
return

def rule_names(self):
"""Returns the set of distinct rule names (str) found in the table."""
kwargs = {
'ProjectionExpression': 'RuleName',
'Select': 'SPECIFIC_ATTRIBUTES'
}
return set(item['RuleName'] for item in self._paginate(self.table.scan, kwargs))

def pending_alerts(self, rule_name, alert_proc_timeout_sec):
"""Find all alerts for the given rule which need to be dispatched to the alert processor.
Args:
rule_name (str): Select all alerts from this rule name
alert_proc_timeout_sec (int): Alert processor timeout
This is used to determine whether an alert could still be in progress
Yields:
dict: Each alert (row) with all columns and values.
"""
kwargs = {
# We need a consistent read here in order to pick up the most recent updates from the
# alert processor. Otherwise, deleted/updated alerts may not yet have propagated.
'ConsistentRead': True,

# Include only those alerts which have not yet dispatched or were dispatched more than
# ALERT_PROCESSOR_TIMEOUT seconds ago
'FilterExpression': (Attr('Dispatched').not_exists() |
Attr('Dispatched').lt(int(time.time()) - alert_proc_timeout_sec)),

'KeyConditionExpression': Key('RuleName').eq(rule_name),
'Select': 'ALL_ATTRIBUTES'
}
for item in self._paginate(self.table.query, kwargs):
yield item

def mark_as_dispatched(self, rule_name, alert_id):
"""Mark a specific alert as dispatched (in progress)."""
# Update the alerts table with the dispatch time, but only if the alert still exists.
# (The alert processor could have deleted the alert before the table update finishes).
try:
self.table.update_item(
Key={'RuleName': rule_name, 'AlertID': alert_id},
UpdateExpression='SET Dispatched = :now ADD Attempts :one',
ExpressionAttributeValues={':now': int(time.time()), ':one': 1},
ConditionExpression='attribute_exists(AlertID)'
)
except ClientError as error:
# The update will fail if the alert was already deleted by the alert processor,
# in which case there's nothing to do! Any other error is re-raised.
if error.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise


class AlertEncoder(json.JSONEncoder):
"""Custom JSON encoder which handles sets and Decimals."""
def default(self, obj): # pylint: disable=arguments-differ,method-hidden
if isinstance(obj, set):
return list(obj)
if isinstance(obj, Decimal):
return float(obj)
return json.JSONEncoder.default(self, obj)


# TODO: Alert merging will be implemented here
class AlertMerger(object):
"""Dispatch alerts to the alert processor."""
ALERT_MERGER = None # AlertMerger instance which can be re-used across Lambda invocations
Expand All @@ -131,30 +37,33 @@ def get_instance(cls):
return cls.ALERT_MERGER

def __init__(self):
self.alerts_db = AlertTable(os.environ['ALERTS_TABLE'])
self.table = AlertTable(os.environ['ALERTS_TABLE'])
self.alert_proc = os.environ['ALERT_PROCESSOR']
self.alert_proc_timeout = int(os.environ['ALERT_PROCESSOR_TIMEOUT_SEC'])
self.lambda_client = boto3.client('lambda')

def _dispatch_alert(self, alert):
"""Dispatch all alerts which need to be sent to the rule processor."""
this_attempt_num = alert.get('Attempts', 0) + 1
LOGGER.info('Dispatching alert %s to %s (attempt %d)',
alert['AlertID'], self.alert_proc, this_attempt_num)
MetricLogger.log_metric(ALERT_MERGER_NAME, MetricLogger.ALERT_ATTEMPTS, this_attempt_num)
alert.attempts += 1
LOGGER.info('Dispatching %s to %s (attempt %d)', alert, self.alert_proc, alert.attempts)
MetricLogger.log_metric(ALERT_MERGER_NAME, MetricLogger.ALERT_ATTEMPTS, alert.attempts)

self.lambda_client.invoke(
FunctionName=self.alert_proc,
InvocationType='Event',
Payload=json.dumps(alert, cls=AlertEncoder, separators=(',', ':')),
# The maximum async invocation size for Lambda is 128 KB. Since alerts could be larger
# than that, the alert processor is responsible for pulling the full record.
Payload=json.dumps(alert.dynamo_key),
Qualifier='production'
)
self.alerts_db.mark_as_dispatched(alert['RuleName'], alert['AlertID'])

alert.last_dispatched = int(time.time())
self.table.mark_as_dispatched(alert)

def dispatch(self):
"""Find and dispatch all pending alerts to the alert processor."""
for rule_name in self.alerts_db.rule_names():
for alert in self.alerts_db.pending_alerts(rule_name, self.alert_proc_timeout):
for rule_name in self.table.rule_names():
for alert in self.table.pending_alerts(rule_name, self.alert_proc_timeout):
self._dispatch_alert(alert)


Expand Down
16 changes: 0 additions & 16 deletions stream_alert/alert_processor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from collections import OrderedDict


def ordered_dict(data, exclude_keys=None):
"""Convert a dictionary into a sorted OrderedDictionary, removing extraneous keys."""
result = OrderedDict()
for key, value in sorted(data.items()):
if exclude_keys and key in exclude_keys:
continue

if isinstance(value, dict):
result[key] = ordered_dict(value, exclude_keys)
else:
result[key] = value

return result


def elide_string_middle(text, max_length):
Expand Down
Loading

0 comments on commit 943e6cc

Please sign in to comment.