Skip to content

Commit

Permalink
adding connect and read timeouts for firehose client (#736)
Browse files Browse the repository at this point in the history
* adding connect and read timeouts for firehose client

* adding logger that will track FailedPutCount values for firehose
  • Loading branch information
ryandeivert authored May 15, 2018
1 parent af596ea commit cbf72ba
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions stream_alert/rule_processor/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import backoff
import boto3
from botocore import client
from botocore.exceptions import ClientError
from botocore.vendored.requests.exceptions import ConnectionError

Expand All @@ -45,10 +46,18 @@ class StreamAlertFirehose(object):
MAX_BATCH_SIZE = 4000 * 1000
# The subtraction of 2 accounts for the newline at the end
MAX_RECORD_SIZE = 1000 * 1000 - 2
# Set a boto connect and read timeout in an attempt to shorten the time it takes to
# send to firehose. This will effectively cause retries to happen quicker
BOTO_TIMEOUT = 5

def __init__(self, region, firehose_config, log_sources):
self._region = region
self._firehose_client = boto3.client('firehose', region_name=self._region)
boto_config = client.Config(
connect_timeout=self.BOTO_TIMEOUT,
read_timeout=self.BOTO_TIMEOUT,
region_name=self._region
)
self._firehose_client = boto3.client('firehose', config=boto_config)
# Expand enabled logs into specific subtypes
self._enabled_logs = self._load_enabled_log_sources(firehose_config, log_sources)

Expand Down Expand Up @@ -180,13 +189,18 @@ def _firehose_request_helper(self, stream_name, record_batch):
on_giveup=giveup_handler())
def firehose_request_wrapper(data):
"""Firehose request wrapper to use with backoff"""
LOGGER.info('[Firehose] Sending %d records to %s',
record_batch_size,
stream_name)
return self._firehose_client.put_record_batch(
LOGGER.info('[Firehose] Sending %d records to %s', record_batch_size, stream_name)

response = self._firehose_client.put_record_batch(
DeliveryStreamName=stream_name,
Records=data)

# Log this as an error for now so it can be picked up in logs
if response['FailedPutCount'] > 0:
LOGGER.error('Receieved non-zero FailedPutCount: %d', response['FailedPutCount'])

return response

# The newline at the end is required by Firehose,
# otherwise all records will be on a single line and
# unsearchable in Athena.
Expand Down

0 comments on commit cbf72ba

Please sign in to comment.