Skip to content

Commit

Permalink
OOTB support AWS Eventbridge (#40006)
Browse files Browse the repository at this point in the history
Adding code to OOTB support AWS Eventbridge generated events for S3 changes, see https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html
  • Loading branch information
mjmbischoff authored Sep 13, 2024
1 parent 885a2db commit c37159e
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 79 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929]
- Add ability to remove request trace logs from CEL input. {pull}39969[39969]
- Add ability to remove request trace logs from HTTPJSON input. {pull}40003[40003]
- Update CEL mito extensions to v1.13.0. {pull}40035[40035]
- Added out of the box support for Amazon EventBridge notifications over SQS to S3 input {pull}40006[40006]
- Update CEL mito extensions to v1.13.0 {pull}40035[40035]
- Add Jamf entity analytics provider. {pull}39996[39996]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
Expand Down
7 changes: 7 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,13 @@ Please see https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-not
for more details. SQS queue will be configured as a
https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html[subscriber to the SNS topic].

[float]
=== S3 -> EventBridge -> SQS setup
Amazon S3 can alternatively https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html[send events to EventBridge],
which can then be used to route these events to SQS. While the S3 input will
filter for 'Object Created' events it's more efficient to configure EventBridge
to only forward the 'Object Created' events.

[float]
=== Parallel Processing

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 71 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,74 @@ resource "aws_sns_topic_subscription" "filebeat-integtest-sns" {
protocol = "sqs"
endpoint = aws_sqs_queue.filebeat-integtest-sns.arn
}

resource "aws_s3_bucket" "filebeat-integtest-eventbridge" {
bucket = "filebeat-s3-integtest-eventbridge-${random_string.random.result}"
force_destroy = true
}

resource "aws_sqs_queue" "filebeat-integtest-eventbridge" {
name = "filebeat-s3-integtest-eventbridge-${random_string.random.result}"
}

data "aws_iam_policy_document" "sqs_queue_policy" {
statement {
effect = "Allow"
actions = ["sqs:SendMessage"]

principals {
type = "Service"
identifiers = ["events.amazonaws.com"]
}

resources = [aws_sqs_queue.filebeat-integtest-eventbridge.arn]
}
}

resource "aws_sqs_queue_policy" "filebeat-integtest-eventbridge" {
queue_url = aws_sqs_queue.filebeat-integtest-eventbridge.id
policy = data.aws_iam_policy_document.sqs_queue_policy.json
}

resource "aws_cloudwatch_event_rule" "sqs" {
name = "capture-s3-notification"
description = "Capture s3 changes"

event_pattern = jsonencode({
source = [
"aws.s3"
],
detail-type = [
"Object Created"
]
detail = {
bucket = {
name = [ aws_s3_bucket.filebeat-integtest-eventbridge.id ]
}
}
})

depends_on = [
aws_s3_bucket.filebeat-integtest-eventbridge
]
}

resource "aws_cloudwatch_event_target" "sqs" {
rule = aws_cloudwatch_event_rule.sqs.name
target_id = "SendToSQS"
arn = aws_sqs_queue.filebeat-integtest-eventbridge.arn

depends_on = [
aws_cloudwatch_event_rule.sqs
]
}

resource "aws_s3_bucket_notification" "bucket_notification-eventbridge" {
bucket = aws_s3_bucket.filebeat-integtest-eventbridge.id
eventbridge = true

depends_on = [
aws_cloudwatch_event_target.sqs
]
}

2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ resource "local_file" "secrets" {
"bucket_name" : aws_s3_bucket.filebeat-integtest.id
"bucket_name_for_sns" : aws_s3_bucket.filebeat-integtest-sns.id
"queue_url_for_sns" : aws_sqs_queue.filebeat-integtest-sns.url
"bucket_name_for_eventbridge" : aws_s3_bucket.filebeat-integtest-eventbridge.id
"queue_url_for_eventbridge" : aws_sqs_queue.filebeat-integtest-eventbridge.url
})
filename = "${path.module}/outputs.yml"
file_permission = "0644"
Expand Down
47 changes: 26 additions & 21 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,29 @@ type constantSQS struct {

var _ sqsAPI = (*constantSQS)(nil)

func newConstantSQS() *constantSQS {
return &constantSQS{
msgs: []sqsTypes.Message{
newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile))),
},
func newConstantSQS() (*constantSQS, error) {
event, err := newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile)))
if err != nil {
return nil, err
}
return &constantSQS{
msgs: []sqsTypes.Message{event},
}, nil
}

func (c *constantSQS) ReceiveMessage(ctx context.Context, maxMessages int) ([]sqsTypes.Message, error) {
func (c *constantSQS) ReceiveMessage(context.Context, int) ([]sqsTypes.Message, error) {
return c.msgs, nil
}

func (*constantSQS) DeleteMessage(ctx context.Context, msg *sqsTypes.Message) error {
func (*constantSQS) DeleteMessage(context.Context, *sqsTypes.Message) error {
return nil
}

func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqsTypes.Message, timeout time.Duration) error {
func (*constantSQS) ChangeMessageVisibility(context.Context, *sqsTypes.Message, time.Duration) error {
return nil
}

func (c *constantSQS) GetQueueAttributes(ctx context.Context, attr []sqsTypes.QueueAttributeName) (map[string]string, error) {
func (c *constantSQS) GetQueueAttributes(context.Context, []sqsTypes.QueueAttributeName) (map[string]string, error) {
return map[string]string{}, nil
}

Expand All @@ -84,7 +86,7 @@ func (c *s3PagerConstant) HasMorePages() bool {
return c.currentIndex < len(c.objects)
}

func (c *s3PagerConstant) NextPage(ctx context.Context, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) {
func (c *s3PagerConstant) NextPage(context.Context, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) {
if !c.HasMorePages() {
return nil, errors.New("no more pages")
}
Expand Down Expand Up @@ -143,19 +145,19 @@ func newConstantS3(t testing.TB) *constantS3 {
}
}

func (c constantS3) GetObject(ctx context.Context, _, bucket, key string) (*s3.GetObjectOutput, error) {
func (c constantS3) GetObject(context.Context, string, string, string) (*s3.GetObjectOutput, error) {
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
}

func (c constantS3) CopyObject(ctx context.Context, _, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
func (c constantS3) CopyObject(context.Context, string, string, string, string, string) (*s3.CopyObjectOutput, error) {
return nil, nil
}

func (c constantS3) DeleteObject(ctx context.Context, _, bucket, key string) (*s3.DeleteObjectOutput, error) {
func (c constantS3) DeleteObject(context.Context, string, string, string) (*s3.DeleteObjectOutput, error) {
return nil, nil
}

func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
func (c constantS3) ListObjectsPaginator(string, string) s3Pager {
return c.pagerConstant
}

Expand All @@ -164,7 +166,7 @@ var _ beat.Pipeline = (*fakePipeline)(nil)
// fakePipeline returns new ackClients.
type fakePipeline struct{}

func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) {
func (c *fakePipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) {
return &ackClient{}, nil
}

Expand Down Expand Up @@ -211,12 +213,13 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
var err error
pipeline := &fakePipeline{}

conf := makeBenchmarkConfig(t)
conf.MaxNumberOfMessages = maxMessagesInflight
sqsReader := newSQSReaderInput(conf, aws.Config{})
config := makeBenchmarkConfig(t)
config.MaxNumberOfMessages = maxMessagesInflight
sqsReader := newSQSReaderInput(config, aws.Config{})
sqsReader.log = log.Named("sqs")
sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), maxMessagesInflight)
sqsReader.sqs = newConstantSQS()
sqsReader.sqs, err = newConstantSQS()
require.NoError(t, err)
sqsReader.s3 = newConstantS3(t)
sqsReader.msgHandler, err = sqsReader.createEventProcessor(pipeline)
require.NoError(t, err, "createEventProcessor must succeed")
Expand Down Expand Up @@ -252,7 +255,8 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
}

func TestBenchmarkInputSQS(t *testing.T) {
logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
require.NoError(t, err)

results := []testing.BenchmarkResult{
benchmarkInputSQS(t, 1),
Expand Down Expand Up @@ -388,7 +392,8 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
}

func TestBenchmarkInputS3(t *testing.T) {
logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
require.NoError(t, err)

results := []testing.BenchmarkResult{
benchmarkInputS3(t, 1),
Expand Down
Loading

0 comments on commit c37159e

Please sign in to comment.