-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create shared AlertTable and Alert (with merge algorithm) #666
Conversation
Also, just realized I got PR 666! 😈 😈 😈 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work! One quick comment
stream_alert/shared/alert_table.py
Outdated
""" | ||
# Update the alerts table with the dispatch time, but only if the alert still exists. | ||
# (The alert processor could have deleted the alert before this table update finishes). | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this try/except
live in a wrapper function to update_item
instead that mark_as_dispatched
and update_retry_outputs
use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good idea. In my original implementation, it was only on one of the functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, creating a decorator turned out to be even cleaner!
@ignore_conditional_failure
def update_table(self, ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor comments and request for clarification, but otherwise this looks awesome!! :)
@@ -367,7 +367,7 @@ max-bool-expr=5 | |||
max-branches=25 | |||
|
|||
# Maximum number of locals for function / method body | |||
max-locals=25 | |||
max-locals=30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do we draw the line here? 🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Pylint was complaining that the Alert
class had too many attributes. I felt it made sense as is, but if it grew considerably it should be refactored, which is why I raised this threshold instead of disabling the check on the class, for example. Do you feel we should restructure the class?
@@ -68,14 +68,16 @@ def _tf_vpc_config(lambda_config): | |||
return result | |||
|
|||
|
|||
def generate_lambda(function_name, lambda_config, config, environment=None): | |||
def generate_lambda(function_name, lambda_config, config, environment=None, metrics_lookup=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for this - thanks for fixing!!!
stream_alert/alert_processor/main.py
Outdated
elif any(output_results.values()): | ||
# At least one output succeeded - update the table with those outputs which need retried | ||
self.alerts_table.update_retry_outputs(alert) | ||
# If all outputs failed, no table updates are necessary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this hanging comment is a bit odd - what line is it related to exactly? maybe it's the else
case..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's the else
case, but having else: pass
seemed unnecessary. If I start the comment with # else: ...
would that be clearer?
stream_alert/alert_processor/main.py
Outdated
if dispatcher: | ||
results[output] = self._send_alert(payload, output, dispatcher) | ||
# Remove normalization key from the record | ||
if NORMALIZATION_KEY in alert.record: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is removal of this key required? don't we want this information on the receiving end? I don't believe we've done this in the past
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, we are currently removing the data normalization key when building the alert payload here:
streamalert/stream_alert/alert_processor/main.py
Lines 65 to 89 in 8692807
@staticmethod | |
def _build_alert_payload(record): | |
"""Transform a raw Dynamo record into a payload ready for dispatching. | |
Args: | |
record (dict): A row in the Dynamo alerts table | |
Returns: | |
OrderedDict: An alert payload ready to be sent to output dispatchers. | |
""" | |
# Any problems with the alert keys or JSON loading will raise an exception here. | |
# This is what we want - an invalid alert is a show-stopper and shouldn't ever happen. | |
return ordered_dict({ | |
'cluster': record['Cluster'], | |
'created': record['Created'], | |
'id': record['AlertID'], | |
'log_source': record['LogSource'], | |
'log_type': record['LogType'], | |
'outputs': record['Outputs'], | |
'record': json.loads(record['Record']), | |
'rule_description': record['RuleDescription'], | |
'rule_name': record['RuleName'], | |
'source_entity': record['SourceEntity'], | |
'source_service': record['SourceService'] | |
}, exclude_keys={NORMALIZATION_KEY}) |
We could discuss adding it back in, but for now I want to keep the same behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure I see that we're doing that now- but before the changes you link to, were we doing that? I'm unsure why we would want to remove these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing normalized keys from outputs has been in place since September: 201aa1e#diff-292514b478ade4d1cb4f29a7b9c32528
Perhaps @chunyong-lin can provide more context, but my guess is that it doesn't provide much useful information when triaging the alert, since it is essentially a copy of many of the fields in the alert. Saying source_hash_md5 = 'ABCD'
in the alert AND streamalert:normalizedtypes = { 'md5': 'ABCD'}
makes the PagerDuty unnecessarily verbose, for example.
Note that threat intel IOCs are retained.
But I 100% agree that it would be useful to have this information at least in Athena, so we could query for all alerts which contain a given type. Thus far, we have sent exactly the same alert payload to every output. With alert merging, this will be changing, and we should definitely revisit this soon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a TODO to make sure we don't forget about it - great point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure - that makes sense :) thanks for the context!
@@ -63,7 +63,7 @@ def format_output_config(cls, service_config, values): | |||
**{values['descriptor'].value: values['aws_value'].value}) | |||
|
|||
@abstractmethod | |||
def dispatch(self, **kwargs): | |||
def dispatch(self, alert, descriptor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for this change!! this is how I initially wanted it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay!
stream_alert/shared/alert.py
Outdated
) | ||
) | ||
|
||
self.alert_id = kwargs.get('alert_id') or str(uuid.uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of doing the kwargs.get(...) or ....
logic, why not just supply a default value to the kwargs.get()
? for example: kwargs.get('alert_id', str(uuid.uuid4()))
since kwargs is just a dictionary. is there some benefit I'm missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I address this in a comment a few lines down, but the basic problem is that you could have Falsey values of the wrong type.
In particular, Dynamo will reject empty strings and empty sets. So if someone were to call Alert(alert_id='')
, a simple kwargs.get(alert_id)
will accept the empty string value, which we don't want. The idea of using or
is to force any Falsey value (0, None, '', [], {}) to the Falsey value of the appropriate type.
Happy to discuss whether this is necessary in every case (alert_id
in particular should never be an empty string, but cluster
or log_source
could be), but it seemed a bit safer to enforce the correct types from the beginning of Alert creation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aha! this is actually the reasoning I had behind this code awhile back, but removed that because I couldn't recall my reasoning :LOL: the gist there was that I wanted to support having a value in the config (so the user knew the config setting existed) but if it said value was blank we would fall back on a default. a typical .get(...)
was not enough for this.
Thanks for the explanation :) I'm okay with leaving as is (and sorry I didn't see your comment :P)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expanded on the comment and moved it further up to make it easier to see
stream_alert/shared/alert.py
Outdated
if not set(kwargs).issubset(self._EXPECTED_INIT_KWARGS): | ||
raise TypeError( | ||
'Invalid Alert kwargs: {} is not a subset of {}'.format( | ||
','.join(sorted(kwargs)), ','.join(sorted(self._EXPECTED_INIT_KWARGS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is going to result in a very large Exception message due to there being a large list of _EXPECTED_INIT_KWARGS
- I'd suggest using a difference
of the sets to make this more readable/actionable. something like:
raise TypeError(
'Invalid Alert kwargs: {} argument(s) are not in the expected set of {}'.format(
', '.join(sorted(set(kwargs).difference(self._EXPECTED_INIT_KWARGS))), ', '.join(sorted(self._EXPECTED_INIT_KWARGS))
also putting a space after the comma in the join will help with readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, thanks! Will do
Returns: | ||
Alert: An alert with all properties populated from the Dynamo item | ||
""" | ||
return cls( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++++++++++
'id': self.alert_id, | ||
'log_source': self.log_source or '', | ||
'log_type': self.log_type or '', | ||
'outputs': list(sorted(self.outputs)), # List instead of set for JSON-compatibility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good use of comments ++
stream_alert/shared/alert.py
Outdated
} | ||
|
||
# Union all of the outputs together. | ||
all_outputs = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I'm not sure if this was covered in the design, but I don't know how I feel about this part. maybe you can offer some clarification, but it appears that similar alerts that have an output of say "whatever-bucket" can be merged with other alerts that do not have the output of "whatever-bucket" and instead have an output of "foo-firehose". is this correct? If so, I don't think this should happen. I believe the merge should only occur for alerts going to the same exact outputs. maybe I'm missing it or this logic exists elsewhere, but this sorta goes against the idea of segmentation.
for instance, if we configure a specific rule that alerts on sensitive info to send to one output (that is specific for this rule's use case), I'm thinking this logic could result in that alerts landing in other outputs as well. let me know if I'm not seeing this correctly...
EDIT: after speaking with jack and continuing my review I think maybe this merging only happens per-rule and not across rules (please confirm). in which case, when would the outputs differ and need to be union-ed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct - merging will only happen on the same rule name.
The possibility of different outputs comes from large merge windows. A rule may have a merge window of 24-72 hours, and the outputs listed in that rule (or any parameters, really) could change in that timeframe. The most conservative approach here is to send to every output which was defined at any point during the merge window, but alternatively we could just use the set of outputs in the most recent version of the rule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of using the latest set of outputs, but could go either way on this. let's leave it as is and maybe discuss if it become a problem for some reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree latest might be more clear. Otherwise, someone might remove an output and wonder why, 12 hours a later, a merged alert raised a bunch of errors for trying to deliver to an output which doesn't exist.
PTAL @jacknagz @ryandeivert. I've addressed your feedback and confirmed it still works end to end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes LGTM! thanks 🎉
to: @ryandeivert @jacknagz
cc: @airbnb/streamalert-maintainers
size: large
Background
This implements the actual algorithm for merging alerts. Merging is not being enabled, this is the logic to make it possible.
In addition, to make this change and future development easier, I've taken the time to refactor an Alert and AlertTable class as shared libraries used by all core Lambda functions.
Changes
Currently, the alert merger sends the raw alert payload to the alert processor. However, there is a relatively low limit on Lambda invocation payload size (128 KB). Since merged alerts may be even bigger than normal, we get ahead of this problem by sending only the alert ID / rule name to the alert processor. Then the alert processor will download the full record from Dynamo.
Create
AlertTable
class inshared/
for the Dynamo alerts tableCreate
Alert
class inshared/
, replacing the dict/OrderedDict in use nowAlert
includes a newstaged
attribute to be used when rule baking is implementedRetryOutputs Improvements
RetryOutputs
- the next alert processor will retry the same set of outputs.RetryOutputs
update will now fail if the alert no longer exists. This completely eliminates the possibility of the update operation creating an invalid entry in the table if the alert was already deleted somehow (e.g. manually)Fixes a bug introduced by [cli][tf] lambda tf module generation now expects the lambda config from caller #664 where the CLI raised an exception when trying to generate metric filters for functions using the
tf_lambda
moduleTesting
Alert
andAlertTable
classes