Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OOTB support AWS Eventbridge #40006

Merged
merged 19 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a30a6e5
Adding code to OOTB support AWS Eventbridge generated events for S3 c…
mjmbischoff Jun 25, 2024
5c22b13
running make update and make fmt
mjmbischoff Jun 25, 2024
74fea52
updating test code to return error and call testing.T NoError + fixin…
mjmbischoff Jun 26, 2024
fbda183
Extending input documentation to cover changes
mjmbischoff Jun 26, 2024
d38d96e
updating CHANGELOG.next.asciidoc
mjmbischoff Jun 26, 2024
7487de3
revert change to test_autodiscover.py
mjmbischoff Jun 27, 2024
e5a6336
revert list_init.go
mjmbischoff Jun 27, 2024
dcd1285
Merge branch 'refs/heads/main' into ootb-aws-eventbridge
mjmbischoff Jun 27, 2024
15f3c74
Fixing up / extending tests
mjmbischoff Jul 19, 2024
7cb424d
reverting commenting out localstack variables, tbh localstack part sh…
mjmbischoff Jul 19, 2024
dda551f
Merge branch 'elastic:main' into ootb-aws-eventbridge
mjmbischoff Jul 19, 2024
36d4e6f
Merge branch 'main' into ootb-aws-eventbridge
mjmbischoff Jul 24, 2024
e52d931
Fixing possible array out of bounds. Thank you tests/CI
mjmbischoff Jul 25, 2024
725e865
Merge branch 'elastic:main' into ootb-aws-eventbridge
mjmbischoff Jul 25, 2024
b8ad1dd
Merge branch 'refs/heads/main' into ootb-aws-eventbridge
mjmbischoff Aug 19, 2024
30107a6
Merge branch 'main' into ootb-aws-eventbridge
mjmbischoff Aug 20, 2024
2fd359b
Merge branch 'refs/heads/main' into ootb-aws-eventbridge
mjmbischoff Sep 13, 2024
122c332
Merge remote-tracking branch 'origin/ootb-aws-eventbridge' into ootb-…
mjmbischoff Sep 13, 2024
b4ba7cc
running make update
mjmbischoff Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929]
- Add ability to remove request trace logs from CEL input. {pull}39969[39969]
- Add ability to remove request trace logs from HTTPJSON input. {pull}40003[40003]
- Update CEL mito extensions to v1.13.0. {pull}40035[40035]
- Added out of the box support for Amazon EventBridge notifications over SQS to S3 input {pull}40006[40006]
- Update CEL mito extensions to v1.13.0 {pull}40035[40035]
- Add Jamf entity analytics provider. {pull}39996[39996]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
Expand Down
424 changes: 212 additions & 212 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/go-autorest/autorest/adal v0.9.24
github.com/apache/arrow/go/v14 v14.0.2
github.com/aws/aws-sdk-go v1.38.60
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15
github.com/aws/aws-sdk-go-v2/service/cloudformation v1.50.0
Expand Down Expand Up @@ -248,7 +249,6 @@ require (
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/apache/thrift v0.19.0 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.38.60 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
Expand Down
7 changes: 7 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,13 @@ Please see https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-not
for more details. SQS queue will be configured as a
https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html[subscriber to the SNS topic].

[float]
=== S3 -> EventBridge -> SQS setup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought here: maybe we can add a cloudformation template to help setup s3 -> eventbridge -> sqs?
We are working on adding a template for the s3-sqs setup: #40642
(not required change for this pr)

Amazon S3 can alternatively https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html[send events to EventBridge],
which can then be used to route these events to SQS. While the S3 input will
filter for 'Object Created' events it's more efficient to configure EventBridge
to only forward the 'Object Created' events.

[float]
=== Parallel Processing

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 71 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,74 @@ resource "aws_sns_topic_subscription" "filebeat-integtest-sns" {
protocol = "sqs"
endpoint = aws_sqs_queue.filebeat-integtest-sns.arn
}

resource "aws_s3_bucket" "filebeat-integtest-eventbridge" {
bucket = "filebeat-s3-integtest-eventbridge-${random_string.random.result}"
force_destroy = true
}

resource "aws_sqs_queue" "filebeat-integtest-eventbridge" {
name = "filebeat-s3-integtest-eventbridge-${random_string.random.result}"
}

data "aws_iam_policy_document" "sqs_queue_policy" {
statement {
effect = "Allow"
actions = ["sqs:SendMessage"]

principals {
type = "Service"
identifiers = ["events.amazonaws.com"]
}

resources = [aws_sqs_queue.filebeat-integtest-eventbridge.arn]
}
}

resource "aws_sqs_queue_policy" "filebeat-integtest-eventbridge" {
queue_url = aws_sqs_queue.filebeat-integtest-eventbridge.id
policy = data.aws_iam_policy_document.sqs_queue_policy.json
}

resource "aws_cloudwatch_event_rule" "sqs" {
name = "capture-s3-notification"
description = "Capture s3 changes"

event_pattern = jsonencode({
source = [
"aws.s3"
],
detail-type = [
"Object Created"
]
detail = {
bucket = {
name = [ aws_s3_bucket.filebeat-integtest-eventbridge.id ]
}
}
})

depends_on = [
aws_s3_bucket.filebeat-integtest-eventbridge
]
}

resource "aws_cloudwatch_event_target" "sqs" {
rule = aws_cloudwatch_event_rule.sqs.name
target_id = "SendToSQS"
arn = aws_sqs_queue.filebeat-integtest-eventbridge.arn

depends_on = [
aws_cloudwatch_event_rule.sqs
]
}

resource "aws_s3_bucket_notification" "bucket_notification-eventbridge" {
bucket = aws_s3_bucket.filebeat-integtest-eventbridge.id
eventbridge = true

depends_on = [
aws_cloudwatch_event_target.sqs
]
}

2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ resource "local_file" "secrets" {
"bucket_name" : aws_s3_bucket.filebeat-integtest.id
"bucket_name_for_sns" : aws_s3_bucket.filebeat-integtest-sns.id
"queue_url_for_sns" : aws_sqs_queue.filebeat-integtest-sns.url
"bucket_name_for_eventbridge" : aws_s3_bucket.filebeat-integtest-eventbridge.id
"queue_url_for_eventbridge" : aws_sqs_queue.filebeat-integtest-eventbridge.url
})
filename = "${path.module}/outputs.yml"
file_permission = "0644"
Expand Down
47 changes: 26 additions & 21 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,29 @@ type constantSQS struct {

var _ sqsAPI = (*constantSQS)(nil)

func newConstantSQS() *constantSQS {
return &constantSQS{
msgs: []sqsTypes.Message{
newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile))),
},
func newConstantSQS() (*constantSQS, error) {
event, err := newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile)))
if err != nil {
return nil, err
}
return &constantSQS{
msgs: []sqsTypes.Message{event},
}, nil
}

func (c *constantSQS) ReceiveMessage(ctx context.Context, maxMessages int) ([]sqsTypes.Message, error) {
func (c *constantSQS) ReceiveMessage(context.Context, int) ([]sqsTypes.Message, error) {
return c.msgs, nil
}

func (*constantSQS) DeleteMessage(ctx context.Context, msg *sqsTypes.Message) error {
func (*constantSQS) DeleteMessage(context.Context, *sqsTypes.Message) error {
return nil
}

func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqsTypes.Message, timeout time.Duration) error {
func (*constantSQS) ChangeMessageVisibility(context.Context, *sqsTypes.Message, time.Duration) error {
return nil
}

func (c *constantSQS) GetQueueAttributes(ctx context.Context, attr []sqsTypes.QueueAttributeName) (map[string]string, error) {
func (c *constantSQS) GetQueueAttributes(context.Context, []sqsTypes.QueueAttributeName) (map[string]string, error) {
return map[string]string{}, nil
}

Expand All @@ -84,7 +86,7 @@ func (c *s3PagerConstant) HasMorePages() bool {
return c.currentIndex < len(c.objects)
}

func (c *s3PagerConstant) NextPage(ctx context.Context, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) {
func (c *s3PagerConstant) NextPage(context.Context, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) {
if !c.HasMorePages() {
return nil, errors.New("no more pages")
}
Expand Down Expand Up @@ -143,19 +145,19 @@ func newConstantS3(t testing.TB) *constantS3 {
}
}

func (c constantS3) GetObject(ctx context.Context, _, bucket, key string) (*s3.GetObjectOutput, error) {
func (c constantS3) GetObject(context.Context, string, string, string) (*s3.GetObjectOutput, error) {
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
}

func (c constantS3) CopyObject(ctx context.Context, _, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
func (c constantS3) CopyObject(context.Context, string, string, string, string, string) (*s3.CopyObjectOutput, error) {
return nil, nil
}

func (c constantS3) DeleteObject(ctx context.Context, _, bucket, key string) (*s3.DeleteObjectOutput, error) {
func (c constantS3) DeleteObject(context.Context, string, string, string) (*s3.DeleteObjectOutput, error) {
return nil, nil
}

func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
func (c constantS3) ListObjectsPaginator(string, string) s3Pager {
return c.pagerConstant
}

Expand All @@ -164,7 +166,7 @@ var _ beat.Pipeline = (*fakePipeline)(nil)
// fakePipeline returns new ackClients.
type fakePipeline struct{}

func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) {
func (c *fakePipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) {
return &ackClient{}, nil
}

Expand Down Expand Up @@ -211,12 +213,13 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
var err error
pipeline := &fakePipeline{}

conf := makeBenchmarkConfig(t)
conf.MaxNumberOfMessages = maxMessagesInflight
sqsReader := newSQSReaderInput(conf, aws.Config{})
config := makeBenchmarkConfig(t)
config.MaxNumberOfMessages = maxMessagesInflight
sqsReader := newSQSReaderInput(config, aws.Config{})
sqsReader.log = log.Named("sqs")
sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), maxMessagesInflight)
sqsReader.sqs = newConstantSQS()
sqsReader.sqs, err = newConstantSQS()
require.NoError(t, err)
sqsReader.s3 = newConstantS3(t)
sqsReader.msgHandler, err = sqsReader.createEventProcessor(pipeline)
require.NoError(t, err, "createEventProcessor must succeed")
Expand Down Expand Up @@ -252,7 +255,8 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
}

func TestBenchmarkInputSQS(t *testing.T) {
logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
require.NoError(t, err)

results := []testing.BenchmarkResult{
benchmarkInputSQS(t, 1),
Expand Down Expand Up @@ -388,7 +392,8 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
}

func TestBenchmarkInputS3(t *testing.T) {
logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
require.NoError(t, err)

results := []testing.BenchmarkResult{
benchmarkInputS3(t, 1),
Expand Down
Loading
Loading