Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add firehose app #48

Merged
merged 5 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
run: |
eval $(dce leases login --print-creds $LEASE_ID)
aws sts get-caller-identity
make integration-test
TEST_ARGS=-verbose make integration-test
env:
APP: forwarder

Expand Down
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ go-test:
go test -v -race ./...

.PHONY: integration-test
integration-test: sam-package
integration-test: sam-package-all
cd integration && terraform init && \
if [ "$(DEBUG)" = "1" ]; then \
CHECK_DEBUG_FILE=debug.sh terraform test -filter=tests/forwarder.tftest.hcl -verbose; \
CHECK_DEBUG_FILE=debug.sh terraform test $(TEST_ARGS); \
else \
terraform test -filter=tests/forwarder.tftest.hcl; \
terraform test $(TEST_ARGS); \
fi

## sam-validate: validate cloudformation templates
Expand All @@ -60,8 +60,8 @@ sam-validate:

## sam-validate-all: validate all cloudformation templates
sam-validate-all:
for dir in $(SUBDIR); do
APP=$$dir $(MAKE) sam-validate || exit 1;
@ for dir in $(SUBDIR); do \
APP=$$dir $(MAKE) sam-validate || exit 1; \
done

.PHONY: sam-build-all
Expand Down Expand Up @@ -92,8 +92,8 @@ sam-publish: sam-package

## sam-package-all: package all cloudformation templates and push assets to S3
sam-package-all:
for dir in $(SUBDIR); do
APP=$$dir $(MAKE) sam-package || exit 1;
@ for dir in $(SUBDIR); do \
APP=$$dir $(MAKE) sam-package || exit 1; \
done

## sam-package: package cloudformation templates and push assets to S3
Expand Down
3 changes: 3 additions & 0 deletions apps/firehose/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Observe Firehose

This serverless application forwards data from a Firehose stream to S3.
132 changes: 132 additions & 0 deletions apps/firehose/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
---
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Stream Firehose Records to S3.'
Metadata:
AWS::ServerlessRepo::Application:
Name: observe-cloudwatchlogs
Description: Stream Firehose Records to S3.
Author: Observe Inc
SpdxLicenseId: Apache-2.0
ReadmeUrl: README.md
HomePageUrl: https://github.com/observeinc/aws-sam-testing
SemanticVersion: '0.0.1'
SourceCodeUrl: https://github.com/observeinc/aws-sam-testing

Parameters:
BucketARN:
Type: String
Description: >-
S3 Bucket ARN to write log records to.
Prefix:
Type: String
Description: >-
Optional prefix to write log records to.
Default: ''
NameOverride:
Type: String
Description: >-
Set Firehose Delivery Stream name. In the absence of a value, the stack
name will be used.
Default: ''
BufferingInterval:
Type: Number
Default: 60
MinValue: 60
MaxValue: 900
Description: |
Buffer incoming data for the specified period of time, in seconds, before
delivering it to the destination.
BufferingSize:
Type: Number
Default: 1
MinValue: 1
MaxValue: 64
Description: |
Buffer incoming data to the specified size, in MiBs, before delivering it
to the destination.

Conditions:
UseStackName: !Equals
- !Ref NameOverride
- ''

Resources:
Role:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
Policies:
- PolicyName: logging
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !GetAtt LogGroup.Arn
- PolicyName: s3writer
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Ref BucketARN
- !Sub '${BucketARN}/${Prefix}*'
LogGroup:
Type: 'AWS::Logs::LogGroup'
Properties:
LogGroupName: !Join
- ''
- - /aws/firehose/
- !If
- UseStackName
- !Ref AWS::StackName
- !Ref NameOverride
RetentionInDays: 365
LogStream:
Type: 'AWS::Logs::LogStream'
Properties:
LogStreamName: s3logs
LogGroupName: !Ref LogGroup
DeliveryStream:
Type: 'AWS::KinesisFirehose::DeliveryStream'
Properties:
DeliveryStreamName: !If
- UseStackName
- !Ref AWS::StackName
- !Ref NameOverride
DeliveryStreamType: DirectPut
S3DestinationConfiguration:
BucketARN: !Ref BucketARN
RoleARN: !GetAtt Role.Arn
Prefix: !Ref Prefix
ErrorOutputPrefix: !Ref Prefix
BufferingHints:
IntervalInSeconds: !Ref BufferingInterval
SizeInMBs: !Ref BufferingSize
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref LogGroup
LogStreamName: !Ref LogStream

Outputs:
Firehose:
Description: 'Firehose ARN'
Value: !GetAtt 'DeliveryStream.Arn'
70 changes: 70 additions & 0 deletions integration/firehose.tftest.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
run "setup" {
module {
source = "./modules/setup/run"
}
}

run "install" {
variables {
name = run.setup.id
app = "firehose"
parameters = {
BucketARN = "arn:aws:s3:::${run.setup.access_point.bucket}"
}
capabilities = [
"CAPABILITY_IAM",
]
}
}

run "check_firehose" {
module {
source = "./modules/exec"
}

variables {
command = "./scripts/check_firehose"
env_vars = {
FIREHOSE_ARN = run.install.stack.outputs["Firehose"]
DESTINATION = "s3://${run.setup.access_point.bucket}/"
}
}

assert {
condition = output.error == ""
error_message = "Failed to write firehose records"
}
}

run "set_prefix" {
variables {
name = run.setup.id
app = "firehose"
parameters = {
BucketARN = "arn:aws:s3:::${run.setup.access_point.bucket}"
Prefix = "${run.setup.id}/"
}
capabilities = [
"CAPABILITY_IAM",
]
}
}

run "check_firehose_prefix" {
module {
source = "./modules/exec"
}

variables {
command = "./scripts/check_firehose"
env_vars = {
FIREHOSE_ARN = run.install.stack.outputs["Firehose"]
DESTINATION = "s3://${run.setup.access_point.bucket}/${run.setup.id}/"
}
}

assert {
condition = output.error == ""
error_message = "Failed to write firehose records"
}
}
59 changes: 59 additions & 0 deletions integration/scripts/check_firehose
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/bin/bash
# Write records to firehose, verify data shows up in S3
set -euo pipefail

DIE() { echo "$*" 1>&2; exit 1; }

[[ ! -z "${FIREHOSE_ARN:-}" ]] || DIE "FIREHOSE_ARN not set"
[[ ! -z "${DESTINATION:-}" ]] || DIE "DESTINATION not set"

cleanup() {
rm -f "$TMPFILE"
}

trap cleanup EXIT

TMPFILE=$(mktemp)

# Assuming a 1MB buffer limit, writing 2 x 500KB records will immediately flush
# a file to S3.
RECORD_SIZE=${RECORD_SIZE:-500k}
RECORD_COUNT=${RECORD_COUNT:-2}

aws s3 ls ${DESTINATION}`date +%Y` --recursive && DIE "S3 destination already has records"

FIREHOSE_NAME=$(echo "$FIREHOSE_ARN" | cut -d/ -f2)
AWS_REGION=$(echo "$FIREHOSE_ARN" | cut -d: -f4)

# base64 in linux sets a default line wrap. Using tr makes script agnostic to this behavior.
RANDOM_DATA=$(dd if=/dev/urandom bs=${RECORD_SIZE} count=1 2>/dev/null | base64 | tr -d \\n)

echo "[" > ${TMPFILE}
for ((i = 1; i <= ${RECORD_COUNT}; i++)); do
if [ $i -gt 1 ]; then
echo "," >> ${TMPFILE}
fi
echo "{\"Data\":\"${RANDOM_DATA}\"}" >> ${TMPFILE}
done
echo "]" >> ${TMPFILE}

aws firehose put-record-batch \
--delivery-stream-name "${FIREHOSE_NAME}" \
--records file://${TMPFILE} \
--region ${AWS_REGION} \
--no-cli-pager

CHECK_INTERVAL=${CHECK_INTERVAL:-5}
CHECK_TIMEOUT=${CHECK_TIMEOUT:-120}

# Wait up to `CHECK_TIMEOUT` seconds for file to appear
# A file can take quite a long time to flush after reconfiguring a firehose in
# particular.
for i in $(seq 0 ${CHECK_INTERVAL} ${CHECK_TIMEOUT}); do
if [ $i -gt 0 ]; then
echo "waiting"
sleep ${CHECK_INTERVAL}
fi
aws s3 ls ${DESTINATION}`date +%Y` --recursive && exit
done
DIE "Records not found"
6 changes: 3 additions & 3 deletions integration/tests/simple.tftest.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ run "check" {
variables {
command = "./scripts/check_bucket_not_empty"
env_vars = {
SOURCE = run.setup.source.bucket
SOURCE = run.setup.access_point.bucket
}
}

assert {
condition = output.exitcode == 0
error_message = "Bucket not empty check failed"
condition = output.exitcode != 0
error_message = "Bucket isn't empty"
}
}
Loading