Skip to content

Commit

Permalink
feat: add firehose app (#48)
Browse files Browse the repository at this point in the history
* feat: allow running integration tests

Remove the hard-coding of a single test when doing integration tests.
Fixed simple.tftest.hcl to pass.

* feat: add firehose app

This is a basic stack to setup a firehose pointed at an S3 bucket. We
will need Firehose for both streaming cloudwatch logs and metrics. Since
these streams have wildly different formats that cannot be interspersed
(gzipped vs not), we will need separate delivery streams for each type.

* fix: add verbose flag

* fix: remove line wrap

* fix: add debugging to wait step
  • Loading branch information
jta authored Nov 2, 2023
1 parent cb59cce commit 7293ae5
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/tests-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
run: |
eval $(dce leases login --print-creds $LEASE_ID)
aws sts get-caller-identity
make integration-test
TEST_ARGS=-verbose make integration-test
env:
APP: forwarder

Expand Down
3 changes: 3 additions & 0 deletions apps/firehose/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Observe Firehose

This serverless application forwards data from a Firehose stream to S3.
132 changes: 132 additions & 0 deletions apps/firehose/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
---
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Stream Firehose Records to S3.'
Metadata:
AWS::ServerlessRepo::Application:
Name: observe-cloudwatchlogs
Description: Stream Firehose Records to S3.
Author: Observe Inc
SpdxLicenseId: Apache-2.0
ReadmeUrl: README.md
HomePageUrl: https://github.com/observeinc/aws-sam-testing
SemanticVersion: '0.0.1'
SourceCodeUrl: https://github.com/observeinc/aws-sam-testing

Parameters:
BucketARN:
Type: String
Description: >-
S3 Bucket ARN to write log records to.
Prefix:
Type: String
Description: >-
Optional prefix to write log records to.
Default: ''
NameOverride:
Type: String
Description: >-
Set Firehose Delivery Stream name. In the absence of a value, the stack
name will be used.
Default: ''
BufferingInterval:
Type: Number
Default: 60
MinValue: 60
MaxValue: 900
Description: |
Buffer incoming data for the specified period of time, in seconds, before
delivering it to the destination.
BufferingSize:
Type: Number
Default: 1
MinValue: 1
MaxValue: 64
Description: |
Buffer incoming data to the specified size, in MiBs, before delivering it
to the destination.
Conditions:
UseStackName: !Equals
- !Ref NameOverride
- ''

Resources:
Role:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
Policies:
- PolicyName: logging
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !GetAtt LogGroup.Arn
- PolicyName: s3writer
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Ref BucketARN
- !Sub '${BucketARN}/${Prefix}*'
LogGroup:
Type: 'AWS::Logs::LogGroup'
Properties:
LogGroupName: !Join
- ''
- - /aws/firehose/
- !If
- UseStackName
- !Ref AWS::StackName
- !Ref NameOverride
RetentionInDays: 365
LogStream:
Type: 'AWS::Logs::LogStream'
Properties:
LogStreamName: s3logs
LogGroupName: !Ref LogGroup
DeliveryStream:
Type: 'AWS::KinesisFirehose::DeliveryStream'
Properties:
DeliveryStreamName: !If
- UseStackName
- !Ref AWS::StackName
- !Ref NameOverride
DeliveryStreamType: DirectPut
S3DestinationConfiguration:
BucketARN: !Ref BucketARN
RoleARN: !GetAtt Role.Arn
Prefix: !Ref Prefix
ErrorOutputPrefix: !Ref Prefix
BufferingHints:
IntervalInSeconds: !Ref BufferingInterval
SizeInMBs: !Ref BufferingSize
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref LogGroup
LogStreamName: !Ref LogStream

Outputs:
Firehose:
Description: 'Firehose ARN'
Value: !GetAtt 'DeliveryStream.Arn'
70 changes: 70 additions & 0 deletions integration/firehose.tftest.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
run "setup" {
module {
source = "./modules/setup/run"
}
}

run "install" {
variables {
name = run.setup.id
app = "firehose"
parameters = {
BucketARN = "arn:aws:s3:::${run.setup.access_point.bucket}"
}
capabilities = [
"CAPABILITY_IAM",
]
}
}

run "check_firehose" {
module {
source = "./modules/exec"
}

variables {
command = "./scripts/check_firehose"
env_vars = {
FIREHOSE_ARN = run.install.stack.outputs["Firehose"]
DESTINATION = "s3://${run.setup.access_point.bucket}/"
}
}

assert {
condition = output.error == ""
error_message = "Failed to write firehose records"
}
}

run "set_prefix" {
variables {
name = run.setup.id
app = "firehose"
parameters = {
BucketARN = "arn:aws:s3:::${run.setup.access_point.bucket}"
Prefix = "${run.setup.id}/"
}
capabilities = [
"CAPABILITY_IAM",
]
}
}

run "check_firehose_prefix" {
module {
source = "./modules/exec"
}

variables {
command = "./scripts/check_firehose"
env_vars = {
FIREHOSE_ARN = run.install.stack.outputs["Firehose"]
DESTINATION = "s3://${run.setup.access_point.bucket}/${run.setup.id}/"
}
}

assert {
condition = output.error == ""
error_message = "Failed to write firehose records"
}
}
59 changes: 59 additions & 0 deletions integration/scripts/check_firehose
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/bin/bash
# Write records to firehose, verify data shows up in S3
set -euo pipefail

DIE() { echo "$*" 1>&2; exit 1; }

[[ ! -z "${FIREHOSE_ARN:-}" ]] || DIE "FIREHOSE_ARN not set"
[[ ! -z "${DESTINATION:-}" ]] || DIE "DESTINATION not set"

cleanup() {
rm -f "$TMPFILE"
}

trap cleanup EXIT

TMPFILE=$(mktemp)

# Assuming a 1MB buffer limit, writing 2 x 500KB records will immediately flush
# a file to S3.
RECORD_SIZE=${RECORD_SIZE:-500k}
RECORD_COUNT=${RECORD_COUNT:-2}

aws s3 ls ${DESTINATION}`date +%Y` --recursive && DIE "S3 destination already has records"

FIREHOSE_NAME=$(echo "$FIREHOSE_ARN" | cut -d/ -f2)
AWS_REGION=$(echo "$FIREHOSE_ARN" | cut -d: -f4)

# base64 in linux sets a default line wrap. Using tr makes script agnostic to this behavior.
RANDOM_DATA=$(dd if=/dev/urandom bs=${RECORD_SIZE} count=1 2>/dev/null | base64 | tr -d \\n)

echo "[" > ${TMPFILE}
for ((i = 1; i <= ${RECORD_COUNT}; i++)); do
if [ $i -gt 1 ]; then
echo "," >> ${TMPFILE}
fi
echo "{\"Data\":\"${RANDOM_DATA}\"}" >> ${TMPFILE}
done
echo "]" >> ${TMPFILE}

aws firehose put-record-batch \
--delivery-stream-name "${FIREHOSE_NAME}" \
--records file://${TMPFILE} \
--region ${AWS_REGION} \
--no-cli-pager

CHECK_INTERVAL=${CHECK_INTERVAL:-5}
CHECK_TIMEOUT=${CHECK_TIMEOUT:-120}

# Wait up to `CHECK_TIMEOUT` seconds for file to appear
# A file can take quite a long time to flush after reconfiguring a firehose in
# particular.
for i in $(seq 0 ${CHECK_INTERVAL} ${CHECK_TIMEOUT}); do
if [ $i -gt 0 ]; then
echo "waiting"
sleep ${CHECK_INTERVAL}
fi
aws s3 ls ${DESTINATION}`date +%Y` --recursive && exit
done
DIE "Records not found"

0 comments on commit 7293ae5

Please sign in to comment.