Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding connect and read timeouts for firehose client #736

Merged
merged 2 commits into from
May 15, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions stream_alert/rule_processor/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import backoff
import boto3
from botocore import client
from botocore.exceptions import ClientError
from botocore.vendored.requests.exceptions import ConnectionError

Expand All @@ -45,10 +46,18 @@ class StreamAlertFirehose(object):
MAX_BATCH_SIZE = 4000 * 1000
# The subtraction of 2 accounts for the newline at the end
MAX_RECORD_SIZE = 1000 * 1000 - 2
# Set a boto connect and read timeout in an attempt to shorten the time it takes to
# send to firehose. This will effectively cause retries to happen quicker
BOTO_TIMEOUT = 5

def __init__(self, region, firehose_config, log_sources):
self._region = region
self._firehose_client = boto3.client('firehose', region_name=self._region)
boto_config = client.Config(
connect_timeout=self.BOTO_TIMEOUT,
read_timeout=self.BOTO_TIMEOUT,
region_name=self._region
)
self._firehose_client = boto3.client('firehose', config=boto_config)
# Expand enabled logs into specific subtypes
self._enabled_logs = self._load_enabled_log_sources(firehose_config, log_sources)

Expand Down Expand Up @@ -180,13 +189,18 @@ def _firehose_request_helper(self, stream_name, record_batch):
on_giveup=giveup_handler())
def firehose_request_wrapper(data):
"""Firehose request wrapper to use with backoff"""
LOGGER.info('[Firehose] Sending %d records to %s',
record_batch_size,
stream_name)
return self._firehose_client.put_record_batch(
LOGGER.info('[Firehose] Sending %d records to %s', record_batch_size, stream_name)

response = self._firehose_client.put_record_batch(
DeliveryStreamName=stream_name,
Records=data)

# Log this as an error for now so it can be picked up in logs
if response['FailedPutCount'] > 0:
LOGGER.error('Receieved non-zero FailedPutCount: %d', response['FailedPutCount'])

return response

# The newline at the end is required by Firehose,
# otherwise all records will be on a single line and
# unsearchable in Athena.
Expand Down