-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] drop sns publish for lambda invoke #225
Changes from 1 commit
16c624b
bc59d64
3d352e6
33694f6
bfd7111
90f0739
252b5ea
dd79093
d71981a
b48c3a6
3485350
21cb539
5c54db5
bf8433e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,121 +13,88 @@ | |
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
''' | ||
|
||
import json | ||
import logging | ||
import sys | ||
|
||
import boto3 | ||
from botocore.exceptions import ClientError | ||
|
||
_SNS_MAX_SIZE = (256*1024) | ||
|
||
logging.basicConfig() | ||
LOGGER = logging.getLogger('StreamAlert') | ||
|
||
def json_dump(sns_dict, indent_value=None): | ||
def _json_dump(alert, indent_value=None): | ||
def json_dict_serializer(obj): | ||
"""Helper method for marshalling dictionary objects to JSON""" | ||
return obj.__dict__ | ||
|
||
try: | ||
return json.dumps(sns_dict, indent=indent_value, default=json_dict_serializer) | ||
return json.dumps(alert, indent=indent_value, default=json_dict_serializer) | ||
except AttributeError as err: | ||
LOGGER.error('An error occurred while dumping object to JSON: %s', err) | ||
return "" | ||
|
||
class SNSMessageSizeError(Exception): | ||
pass | ||
|
||
class StreamSink(object): | ||
"""StreamSink class is used for sending actual alerts to the alert processor""" | ||
def __init__(self, env): | ||
"""StreamSink initializer | ||
|
||
Args: | ||
env [dict]: loaded dictionary containing environment information | ||
""" | ||
self.env = env | ||
self.client_sns = boto3.client('sns', region_name=self.env['lambda_region']) | ||
self.topic = self._get_sns_topic_arn() | ||
self.client_lambda = boto3.client('lambda', | ||
region_name=self.env['lambda_region']) | ||
self.function = self.env['lambda_function_name'].replace( | ||
'_streamalert_rule_processor', '_streamalert_alert_processor') | ||
|
||
def sink(self, alerts): | ||
"""Sink triggered alerts from the StreamRules engine. | ||
|
||
Args: | ||
alerts [list]: a list of dictionaries representating json alerts | ||
|
||
Sends a message to SNS with the following JSON format: | ||
{default: [ | ||
{ | ||
'record': record, | ||
'metadata': { | ||
'rule_name': rule.rule_name, | ||
'rule_description': rule.rule_function.__doc__, | ||
'log': str(payload.log_source), | ||
'outputs': rule.outputs, | ||
'type': payload.type, | ||
'source': { | ||
'service': payload.service, | ||
'entity': payload.entity | ||
} | ||
Sends a message to the alert processor with the following JSON format: | ||
{ | ||
"record": record, | ||
"metadata": { | ||
"rule_name": rule.rule_name, | ||
"rule_description": rule.rule_function.__doc__, | ||
"log": str(payload.log_source), | ||
"outputs": rule.outputs, | ||
"type": payload.type, | ||
"source": { | ||
"service": payload.service, | ||
"entity": payload.entity | ||
} | ||
} | ||
]} | ||
} | ||
""" | ||
for alert in alerts: | ||
sns_dict = {'default': alert} | ||
self.publish_message(json_dump(sns_dict)) | ||
|
||
def _get_sns_topic_arn(self): | ||
"""Return a properly formatted SNS ARN. | ||
|
||
Args: | ||
region: Which AWS region the SNS topic exists in. | ||
topic: The name of the SNS topic. | ||
""" | ||
topic = self.env['lambda_function_name'].replace('_streamalert_rule_processor', | ||
'_streamalerts') | ||
|
||
return 'arn:aws:sns:{region}:{account_id}:{topic}'.format( | ||
region=self.env['lambda_region'], | ||
account_id=self.env['account_id'], | ||
topic=topic | ||
) | ||
|
||
@staticmethod | ||
def _sns_message_size_check(message): | ||
"""Verify the SNS message is less than or equal to 256KB (SNS Limit) | ||
Args: | ||
message: A JSON string containing an alert to send to SNS. | ||
|
||
Returns: | ||
Boolean result of if the message is within the size constraint | ||
""" | ||
message_size = sys.getsizeof(message) | ||
return 0 < message_size <= _SNS_MAX_SIZE | ||
|
||
def publish_message(self, message): | ||
"""Emit a message to SNS. | ||
|
||
Args: | ||
client: The boto3 client object. | ||
message: A JSON string containing a serialized alert. | ||
topic: The SNS topic ARN to send to. | ||
""" | ||
if not self._sns_message_size_check(message): | ||
LOGGER.error('Cannot publish Alerts, message size is too big!') | ||
raise SNSMessageSizeError('SNS message size is too big! (Max: 256KB)') | ||
|
||
try: | ||
response = self.client_sns.publish( | ||
TopicArn=self.topic, | ||
Message=message, | ||
Subject='StreamAlert Rules Triggered' | ||
) | ||
except ClientError: | ||
LOGGER.exception('An error occurred while publishing alert to sns') | ||
return | ||
|
||
if response['ResponseMetadata']['HTTPStatusCode'] != 200: | ||
LOGGER.error('Failed to publish message to sns topic: %s', self.topic.split(':')[-1]) | ||
return | ||
|
||
if self.env['lambda_alias'] != 'development': | ||
LOGGER.info('Published alert to %s', self.topic.split(':')[-1]) | ||
LOGGER.info('SNS MessageID: %s', response['MessageId']) | ||
data = _json_dump(alert) | ||
|
||
try: | ||
response = self.client_lambda.invoke( | ||
FunctionName=self.function, | ||
InvocationType='Event', | ||
Payload=data | ||
) | ||
|
||
except ClientError as err: | ||
LOGGER.exception('An error occurred while sending alert to ' | ||
'\'%s\'. Error is: %s. Alert: %s', | ||
self.function, | ||
err.response, | ||
data) | ||
return | ||
|
||
if response['ResponseMetadata']['HTTPStatusCode'] != 202: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this block slightly repetitive to the preceding one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you be more specific? What's repetitive about it (and are you referring to the try/except above?)? If so, I think an HTTP response of non-202 could possibly occur even if there is no ClientError occurred during invocation. I could also be off on that.. We could also add |
||
LOGGER.error('Failed to send alert to \'%s\': %s', | ||
self.function, data) | ||
return | ||
|
||
if self.env['lambda_alias'] != 'development': | ||
LOGGER.info('Sent alert to \'%s\' with Lambda request ID \'%s\'', | ||
self.function, | ||
response['ResponseMetadata']['RequestId']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is still a maximum Lambda invocation size (6MB for asynchronous requests), but I don't think we'll be anywhere close to that limit. Just something to keep in mind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a comment to the PR body for a future consideration.