Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[lambda][rule] some s3 optimizations #126

Merged
merged 2 commits into from
Apr 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions stream_alert/rule_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ def _kinesis_process(self, payload, classifier):

def _s3_process(self, payload, classifier):
"""Process S3 data for alerts"""
s3_file_lines = StreamPreParsers.pre_parse_s3(payload.raw_record)
for line in s3_file_lines:
data = line.rstrip()
s3_file = StreamPreParsers.pre_parse_s3(payload.raw_record)
for data in StreamPreParsers.read_s3_file(s3_file):
payload.refresh_record(data)
self.process_alerts(classifier, payload, data)

Expand Down
71 changes: 32 additions & 39 deletions stream_alert/rule_processor/pre_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
'''

import base64
import gzip
import logging
Expand Down Expand Up @@ -65,7 +64,7 @@ def pre_parse_s3(cls, raw_record):
size = int(raw_record['s3']['object']['size'])
downloaded_s3_object = cls._download_s3_object(client, bucket, key, size)

return cls._read_s3_file(downloaded_s3_object)
return downloaded_s3_object

@classmethod
def pre_parse_sns(cls, raw_record):
Expand All @@ -78,6 +77,37 @@ def pre_parse_sns(cls, raw_record):
"""
return base64.b64decode(raw_record['Sns']['Message'])

@classmethod
def read_s3_file(cls, downloaded_s3_object):
"""Parse a downloaded file from S3

Supports reading both gzipped files and plaintext files.

Args:
downloaded_s3_object (string): A full path to the downloaded file.

Yields:
[generator] A generator that yields lines from the downloaded s3 object
"""
_, extension = os.path.splitext(downloaded_s3_object)

if extension == '.gz':
for line in gzip.open(downloaded_s3_object, 'r'):
yield line.rstrip()
else:
for line in open(downloaded_s3_object, 'r'):
yield line.rstrip()

# aws lambda apparently does not reallocate disk space when files are
# removed using os.remove(), so we must truncate them before removal
with open(downloaded_s3_object, 'w'):
pass

# remove the file
os.remove(downloaded_s3_object)
if not os.path.exists(downloaded_s3_object):
logger.debug('Removed temp file - %s', downloaded_s3_object)

@classmethod
def _download_s3_object(cls, client, bucket, key, size):
"""Download an object from S3.
Expand Down Expand Up @@ -121,40 +151,3 @@ def _download_s3_object(cls, client, bucket, key, size):
logger.debug('Completed download in %s seconds', round(end_time, 2))

return downloaded_s3_object

@classmethod
def _read_s3_file(cls, downloaded_s3_object):
"""Parse a downloaded file from S3

Supports reading both gzipped files and plaintext files. Truncates
files after reading to save space on /tmp mount.

Args:
downloaded_s3_object (string): A full path to the downloaded file.

Returns:
(list) Lines from the downloaded s3 object
"""
lines = []
filename, extension = os.path.splitext(downloaded_s3_object)

if extension == '.gz':
with gzip.open(downloaded_s3_object, 'r') as f:
lines = f.readlines()
# truncate file
clear_file = gzip.open(downloaded_s3_object, 'w')
clear_file.close()

else:
with open(downloaded_s3_object, 'r') as f:
lines = f.readlines()
# truncate file
clear_file = open(downloaded_s3_object, 'w')
clear_file.close()

# remove file path
os.remove(downloaded_s3_object)
if not os.path.exists(downloaded_s3_object):
logger.debug('Removed temp file - %s', downloaded_s3_object)

return lines
5 changes: 3 additions & 2 deletions test/unit/test_pre_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def test_pre_parse_s3():
obj = s3_resource.Object(bucket_name, key_name)
obj.put(Body=body_value)

parsed = StreamPreParsers.pre_parse_s3(raw_record)
assert_equal(body_value, parsed[0])
s3_file = StreamPreParsers.pre_parse_s3(raw_record)
data = StreamPreParsers.read_s3_file(s3_file).next()
assert_equal(body_value, data)

BOTO_MOCKER.stop()