Skip to content

Commit

Permalink
Merge pull request #126 from airbnb/ryandeivert-s3-optimizations
Browse files Browse the repository at this point in the history
[lambda][rule] some s3 optimizations
  • Loading branch information
ryandeivert authored Apr 28, 2017
2 parents 2b79e91 + ce14db3 commit 51b4cc5
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 44 deletions.
5 changes: 2 additions & 3 deletions stream_alert/rule_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ def _kinesis_process(self, payload, classifier):

def _s3_process(self, payload, classifier):
"""Process S3 data for alerts"""
s3_file_lines = StreamPreParsers.pre_parse_s3(payload.raw_record)
for line in s3_file_lines:
data = line.rstrip()
s3_file = StreamPreParsers.pre_parse_s3(payload.raw_record)
for data in StreamPreParsers.read_s3_file(s3_file):
payload.refresh_record(data)
self.process_alerts(classifier, payload, data)

Expand Down
71 changes: 32 additions & 39 deletions stream_alert/rule_processor/pre_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
'''

import base64
import gzip
import logging
Expand Down Expand Up @@ -65,7 +64,7 @@ def pre_parse_s3(cls, raw_record):
size = int(raw_record['s3']['object']['size'])
downloaded_s3_object = cls._download_s3_object(client, bucket, key, size)

return cls._read_s3_file(downloaded_s3_object)
return downloaded_s3_object

@classmethod
def pre_parse_sns(cls, raw_record):
Expand All @@ -78,6 +77,37 @@ def pre_parse_sns(cls, raw_record):
"""
return base64.b64decode(raw_record['Sns']['Message'])

@classmethod
def read_s3_file(cls, downloaded_s3_object):
"""Parse a downloaded file from S3
Supports reading both gzipped files and plaintext files.
Args:
downloaded_s3_object (string): A full path to the downloaded file.
Yields:
[generator] A generator that yields lines from the downloaded s3 object
"""
_, extension = os.path.splitext(downloaded_s3_object)

if extension == '.gz':
for line in gzip.open(downloaded_s3_object, 'r'):
yield line.rstrip()
else:
for line in open(downloaded_s3_object, 'r'):
yield line.rstrip()

# aws lambda apparently does not reallocate disk space when files are
# removed using os.remove(), so we must truncate them before removal
with open(downloaded_s3_object, 'w'):
pass

# remove the file
os.remove(downloaded_s3_object)
if not os.path.exists(downloaded_s3_object):
logger.debug('Removed temp file - %s', downloaded_s3_object)

@classmethod
def _download_s3_object(cls, client, bucket, key, size):
"""Download an object from S3.
Expand Down Expand Up @@ -121,40 +151,3 @@ def _download_s3_object(cls, client, bucket, key, size):
logger.debug('Completed download in %s seconds', round(end_time, 2))

return downloaded_s3_object

@classmethod
def _read_s3_file(cls, downloaded_s3_object):
"""Parse a downloaded file from S3
Supports reading both gzipped files and plaintext files. Truncates
files after reading to save space on /tmp mount.
Args:
downloaded_s3_object (string): A full path to the downloaded file.
Returns:
(list) Lines from the downloaded s3 object
"""
lines = []
filename, extension = os.path.splitext(downloaded_s3_object)

if extension == '.gz':
with gzip.open(downloaded_s3_object, 'r') as f:
lines = f.readlines()
# truncate file
clear_file = gzip.open(downloaded_s3_object, 'w')
clear_file.close()

else:
with open(downloaded_s3_object, 'r') as f:
lines = f.readlines()
# truncate file
clear_file = open(downloaded_s3_object, 'w')
clear_file.close()

# remove file path
os.remove(downloaded_s3_object)
if not os.path.exists(downloaded_s3_object):
logger.debug('Removed temp file - %s', downloaded_s3_object)

return lines
5 changes: 3 additions & 2 deletions test/unit/test_pre_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def test_pre_parse_s3():
obj = s3_resource.Object(bucket_name, key_name)
obj.put(Body=body_value)

parsed = StreamPreParsers.pre_parse_s3(raw_record)
assert_equal(body_value, parsed[0])
s3_file = StreamPreParsers.pre_parse_s3(raw_record)
data = StreamPreParsers.read_s3_file(s3_file).next()
assert_equal(body_value, data)

BOTO_MOCKER.stop()

0 comments on commit 51b4cc5

Please sign in to comment.