Skip to content

Commit

Permalink
[lambda][rule] small optimization to how s3 files are read/parsed
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandeivert committed Apr 28, 2017
1 parent fe8e704 commit cf69fd7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 42 deletions.
5 changes: 2 additions & 3 deletions stream_alert/rule_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ def _kinesis_process(self, payload, classifier):

def _s3_process(self, payload, classifier):
"""Process S3 data for alerts"""
s3_file_lines = StreamPreParsers.pre_parse_s3(payload.raw_record)
for line in s3_file_lines:
data = line.rstrip()
s3_file = StreamPreParsers.pre_parse_s3(payload.raw_record)
for data in StreamPreParsers.read_s3_file(s3_file):
payload.refresh_record(data)
self.process_alerts(classifier, payload, data)

Expand Down
66 changes: 27 additions & 39 deletions stream_alert/rule_processor/pre_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
See the License for the specific language governing permissions and
limitations under the License.
'''

import base64
import gzip
import logging
Expand Down Expand Up @@ -65,7 +64,7 @@ def pre_parse_s3(cls, raw_record):
size = int(raw_record['s3']['object']['size'])
downloaded_s3_object = cls._download_s3_object(client, bucket, key, size)

return cls._read_s3_file(downloaded_s3_object)
return downloaded_s3_object

@classmethod
def pre_parse_sns(cls, raw_record):
Expand All @@ -78,6 +77,32 @@ def pre_parse_sns(cls, raw_record):
"""
return base64.b64decode(raw_record['Sns']['Message'])

@classmethod
def read_s3_file(cls, downloaded_s3_object):
"""Parse a downloaded file from S3
Supports reading both gzipped files and plaintext files.
Args:
downloaded_s3_object (string): A full path to the downloaded file.
Returns:
(list) Lines from the downloaded s3 object
"""
_, extension = os.path.splitext(downloaded_s3_object)

if extension == '.gz':
for line in gzip.open(downloaded_s3_object, 'r'):
yield line.rstrip()
else:
for line in open(downloaded_s3_object, 'r'):
yield line.rstrip()

# remove the file
os.remove(downloaded_s3_object)
if not os.path.exists(downloaded_s3_object):
logger.debug('Removed temp file - %s', downloaded_s3_object)

@classmethod
def _download_s3_object(cls, client, bucket, key, size):
"""Download an object from S3.
Expand Down Expand Up @@ -121,40 +146,3 @@ def _download_s3_object(cls, client, bucket, key, size):
logger.debug('Completed download in %s seconds', round(end_time, 2))

return downloaded_s3_object

@classmethod
def _read_s3_file(cls, downloaded_s3_object):
"""Parse a downloaded file from S3
Supports reading both gzipped files and plaintext files. Truncates
files after reading to save space on /tmp mount.
Args:
downloaded_s3_object (string): A full path to the downloaded file.
Returns:
(list) Lines from the downloaded s3 object
"""
lines = []
filename, extension = os.path.splitext(downloaded_s3_object)

if extension == '.gz':
with gzip.open(downloaded_s3_object, 'r') as f:
lines = f.readlines()
# truncate file
clear_file = gzip.open(downloaded_s3_object, 'w')
clear_file.close()

else:
with open(downloaded_s3_object, 'r') as f:
lines = f.readlines()
# truncate file
clear_file = open(downloaded_s3_object, 'w')
clear_file.close()

# remove file path
os.remove(downloaded_s3_object)
if not os.path.exists(downloaded_s3_object):
logger.debug('Removed temp file - %s', downloaded_s3_object)

return lines

0 comments on commit cf69fd7

Please sign in to comment.