Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create shared AlertTable and Alert (with merge algorithm) #666

Merged
merged 12 commits into from
Apr 6, 2018
4 changes: 2 additions & 2 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ valid-metaclass-classmethod-first-arg=mcs
max-args=8

# Maximum number of attributes for a class (see R0902).
max-attributes=15
max-attributes=20

# Maximum number of boolean expressions in a if statement
max-bool-expr=5
Expand All @@ -367,7 +367,7 @@ max-bool-expr=5
max-branches=25

# Maximum number of locals for function / method body
max-locals=25
max-locals=30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we draw the line here? 🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Pylint was complaining that the Alert class had too many attributes. I felt it made sense as is, but if it grew considerably it should be refactored, which is why I raised this threshold instead of disabling the check on the class, for example. Do you feel we should restructure the class?


# Maximum number of parents for a class (see R0901).
max-parents=7
Expand Down
117 changes: 13 additions & 104 deletions stream_alert/alert_merger/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,111 +14,17 @@
limitations under the License.
"""
from __future__ import absolute_import
from decimal import Decimal
import json
import os
import time

from stream_alert.alert_merger import LOGGER
from stream_alert.shared.alert_table import AlertTable
from stream_alert.shared.metrics import ALERT_MERGER_NAME, MetricLogger

import boto3
from boto3.dynamodb.conditions import Attr, Key
from botocore.exceptions import ClientError


class AlertTable(object):
"""Provides convenience methods for accessing and modifying the alerts table."""

def __init__(self, table_name):
self.table = boto3.resource('dynamodb').Table(table_name)

@staticmethod
def _paginate(func, func_kwargs):
"""Paginate results from a scan() or query().

Args:
func (method): Function to invoke (ALERTS_TABLE.scan or ALERTS_TABLE.query)
func_kwargs (dict): Keyword arguments to pass to the scan/query function.
The kwargs will be modified if pagination is necessary.

Yields:
dict: Each item (row) from the response
"""
while True:
response = func(**func_kwargs)
for item in response.get('Items', []):
yield item

if response.get('LastEvaluatedKey'):
func_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
else:
return

def rule_names(self):
"""Returns the set of distinct rule names (str) found in the table."""
kwargs = {
'ProjectionExpression': 'RuleName',
'Select': 'SPECIFIC_ATTRIBUTES'
}
return set(item['RuleName'] for item in self._paginate(self.table.scan, kwargs))

def pending_alerts(self, rule_name, alert_proc_timeout_sec):
"""Find all alerts for the given rule which need to be dispatched to the alert processor.

Args:
rule_name (str): Select all alerts from this rule name
alert_proc_timeout_sec (int): Alert processor timeout
This is used to determine whether an alert could still be in progress

Yields:
dict: Each alert (row) with all columns and values.
"""
kwargs = {
# We need a consistent read here in order to pick up the most recent updates from the
# alert processor. Otherwise, deleted/updated alerts may not yet have propagated.
'ConsistentRead': True,

# Include only those alerts which have not yet dispatched or were dispatched more than
# ALERT_PROCESSOR_TIMEOUT seconds ago
'FilterExpression': (Attr('Dispatched').not_exists() |
Attr('Dispatched').lt(int(time.time()) - alert_proc_timeout_sec)),

'KeyConditionExpression': Key('RuleName').eq(rule_name),
'Select': 'ALL_ATTRIBUTES'
}
for item in self._paginate(self.table.query, kwargs):
yield item

def mark_as_dispatched(self, rule_name, alert_id):
"""Mark a specific alert as dispatched (in progress)."""
# Update the alerts table with the dispatch time, but only if the alert still exists.
# (The alert processor could have deleted the alert before the table update finishes).
try:
self.table.update_item(
Key={'RuleName': rule_name, 'AlertID': alert_id},
UpdateExpression='SET Dispatched = :now ADD Attempts :one',
ExpressionAttributeValues={':now': int(time.time()), ':one': 1},
ConditionExpression='attribute_exists(AlertID)'
)
except ClientError as error:
# The update will fail if the alert was already deleted by the alert processor,
# in which case there's nothing to do! Any other error is re-raised.
if error.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise


class AlertEncoder(json.JSONEncoder):
"""Custom JSON encoder which handles sets and Decimals."""
def default(self, obj): # pylint: disable=arguments-differ,method-hidden
if isinstance(obj, set):
return list(obj)
if isinstance(obj, Decimal):
return float(obj)
return json.JSONEncoder.default(self, obj)


# TODO: Alert merging will be implemented here
class AlertMerger(object):
"""Dispatch alerts to the alert processor."""
ALERT_MERGER = None # AlertMerger instance which can be re-used across Lambda invocations
Expand All @@ -131,30 +37,33 @@ def get_instance(cls):
return cls.ALERT_MERGER

def __init__(self):
self.alerts_db = AlertTable(os.environ['ALERTS_TABLE'])
self.table = AlertTable(os.environ['ALERTS_TABLE'])
self.alert_proc = os.environ['ALERT_PROCESSOR']
self.alert_proc_timeout = int(os.environ['ALERT_PROCESSOR_TIMEOUT_SEC'])
self.lambda_client = boto3.client('lambda')

def _dispatch_alert(self, alert):
"""Dispatch all alerts which need to be sent to the rule processor."""
this_attempt_num = alert.get('Attempts', 0) + 1
LOGGER.info('Dispatching alert %s to %s (attempt %d)',
alert['AlertID'], self.alert_proc, this_attempt_num)
MetricLogger.log_metric(ALERT_MERGER_NAME, MetricLogger.ALERT_ATTEMPTS, this_attempt_num)
alert.attempts += 1
LOGGER.info('Dispatching %s to %s (attempt %d)', alert, self.alert_proc, alert.attempts)
MetricLogger.log_metric(ALERT_MERGER_NAME, MetricLogger.ALERT_ATTEMPTS, alert.attempts)

self.lambda_client.invoke(
FunctionName=self.alert_proc,
InvocationType='Event',
Payload=json.dumps(alert, cls=AlertEncoder, separators=(',', ':')),
# The maximum async invocation size for Lambda is 128 KB. Since alerts could be larger
# than that, the alert processor is responsible for pulling the full record.
Payload=json.dumps(alert.dynamo_key),
Qualifier='production'
)
self.alerts_db.mark_as_dispatched(alert['RuleName'], alert['AlertID'])

alert.last_dispatched = int(time.time())
self.table.mark_as_dispatched(alert)

def dispatch(self):
"""Find and dispatch all pending alerts to the alert processor."""
for rule_name in self.alerts_db.rule_names():
for alert in self.alerts_db.pending_alerts(rule_name, self.alert_proc_timeout):
for rule_name in self.table.rule_names():
for alert in self.table.pending_alerts(rule_name, self.alert_proc_timeout):
self._dispatch_alert(alert)


Expand Down
16 changes: 0 additions & 16 deletions stream_alert/alert_processor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from collections import OrderedDict


def ordered_dict(data, exclude_keys=None):
"""Convert a dictionary into a sorted OrderedDictionary, removing extraneous keys."""
result = OrderedDict()
for key, value in sorted(data.items()):
if exclude_keys and key in exclude_keys:
continue

if isinstance(value, dict):
result[key] = ordered_dict(value, exclude_keys)
else:
result[key] = value

return result


def elide_string_middle(text, max_length):
Expand Down
Loading