Skip to content

Commit

Permalink
Add vars in modules.d/aws.yml.disabled (#27454) (#27462)
Browse files Browse the repository at this point in the history
* Add vars in modules.d/aws.yml.disabled
* missing metadata
* rename bucket to bucket_arn

(cherry picked from commit b4ecc29)

Co-authored-by: Andrea Spacca <andrea.spacca@elastic.co>
  • Loading branch information
mergify[bot] and Andrea Spacca authored Aug 18, 2021
1 parent 8afc67f commit 56156d9
Show file tree
Hide file tree
Showing 26 changed files with 227 additions and 59 deletions.
12 changes: 6 additions & 6 deletions filebeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Example config:
cloudtrail:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -64,7 +64,7 @@ Example config:
cloudwatch:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -81,7 +81,7 @@ Example config:
ec2:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -98,7 +98,7 @@ Example config:
elb:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -115,7 +115,7 @@ Example config:
s3access:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -132,7 +132,7 @@ Example config:
vpcflow:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand Down
54 changes: 54 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Process CloudTrail logs
# default is true, set to false to skip Cloudtrail logs
# var.process_cloudtrail_logs: false
Expand Down Expand Up @@ -154,6 +163,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down Expand Up @@ -194,6 +212,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down Expand Up @@ -234,6 +261,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down Expand Up @@ -274,6 +310,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down Expand Up @@ -314,6 +359,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (a *eventACKTracker) ACK() {

// Wait waits for the number of pending ACKs to be zero.
// Wait must be called sequentially only after every expected
// Add call are made. Failing to do so could reset the pendingACKs
// `Add` calls are made. Failing to do so could reset the pendingACKs
// property to 0 and would results in Wait returning after additional
// calls to `Add` are made without a corresponding `ACK` call.
func (a *eventACKTracker) Wait() {
Expand Down
16 changes: 8 additions & 8 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type config struct {
FIPSEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url"`
Bucket string `config:"bucket"`
BucketARN string `config:"bucket_arn"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
Expand All @@ -49,20 +49,20 @@ func defaultConfig() config {
}

func (c *config) Validate() error {
if c.QueueURL == "" && c.Bucket == "" {
return fmt.Errorf("queue_url or bucket must provided")
if c.QueueURL == "" && c.BucketARN == "" {
return fmt.Errorf("queue_url or bucket_arn must provided")
}

if c.QueueURL != "" && c.Bucket != "" {
return fmt.Errorf("queue_url <%v> and bucket <%v> "+
"cannot be set at the same time", c.QueueURL, c.Bucket)
if c.QueueURL != "" && c.BucketARN != "" {
return fmt.Errorf("queue_url <%v> and bucket_arn <%v> "+
"cannot be set at the same time", c.QueueURL, c.BucketARN)
}

if c.Bucket != "" && c.BucketListInterval <= 0 {
if c.BucketARN != "" && c.BucketListInterval <= 0 {
return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval)
}

if c.Bucket != "" && c.NumberOfWorkers <= 0 {
if c.BucketARN != "" && c.NumberOfWorkers <= 0 {
return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers)
}

Expand Down
22 changes: 11 additions & 11 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom("")))
return config{
QueueURL: quequeURL,
Bucket: s3Bucket,
BucketARN: s3Bucket,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
SQSMaxReceiveCount: 5,
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestConfig(t *testing.T) {
"",
s3Bucket,
common.MapStr{
"bucket": s3Bucket,
"bucket_arn": s3Bucket,
"number_of_workers": 5,
},
"",
Expand Down Expand Up @@ -109,21 +109,21 @@ func TestConfig(t *testing.T) {
"",
"",
common.MapStr{
"queue_url": "",
"bucket": "",
"queue_url": "",
"bucket_arn": "",
},
"queue_url or bucket must provided",
"queue_url or bucket_arn must provided",
nil,
},
{
"error on both queueURL and s3Bucket",
queueURL,
s3Bucket,
common.MapStr{
"queue_url": queueURL,
"bucket": s3Bucket,
"queue_url": queueURL,
"bucket_arn": s3Bucket,
},
"queue_url <https://example.com> and bucket <arn:aws:s3:::aBucket> cannot be set at the same time",
"queue_url <https://example.com> and bucket_arn <arn:aws:s3:::aBucket> cannot be set at the same time",
nil,
},
{
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestConfig(t *testing.T) {
"",
s3Bucket,
common.MapStr{
"bucket": s3Bucket,
"bucket_arn": s3Bucket,
"bucket_list_interval": "0",
},
"bucket_list_interval <0s> must be greater than 0",
Expand All @@ -175,7 +175,7 @@ func TestConfig(t *testing.T) {
"",
s3Bucket,
common.MapStr{
"bucket": s3Bucket,
"bucket_arn": s3Bucket,
"number_of_workers": "0",
},
"number_of_workers <0> must be greater than 0",
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestConfig(t *testing.T) {
"",
s3Bucket,
common.MapStr{
"bucket": s3Bucket,
"bucket_arn": s3Bucket,
"expand_event_list_from_field": "Records",
"content_type": "text/plain",
},
Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
}

if in.config.Bucket != "" {
if in.config.BucketARN != "" {
// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, client, persistentStore, states)
if err != nil {
Expand Down Expand Up @@ -203,7 +203,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, client beat.Client, persistent
client: s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)),
}

log := ctx.Logger.With("s3_bucket", in.config.Bucket)
log := ctx.Logger.With("bucket_arn", in.config.BucketARN)
log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers)
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
Expand All @@ -223,7 +223,8 @@ func (in *s3Input) createS3Lister(ctx v2.Context, client beat.Client, persistent
s3EventHandlerFactory,
states,
persistentStore,
in.config.Bucket,
in.config.BucketARN,
in.awsConfig.Region,
in.config.NumberOfWorkers,
in.config.BucketListInterval)

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", numberOfWorkers, time.Second)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "region", numberOfWorkers, time.Second)

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func getTerraformOutputs(t *testing.T) terraformOutputData {

func makeTestConfigS3(s3bucket string) *common.Config {
return common.MustNewConfigFrom(fmt.Sprintf(`---
bucket: aws:s3:::%s
bucket_arn: aws:s3:::%s
number_of_workers: 1
file_selectors:
-
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type s3ObjectPayload struct {
type s3Poller struct {
numberOfWorkers int
bucket string
region string
bucketPollInterval time.Duration
workerSem *sem
s3 s3API
Expand All @@ -60,6 +61,7 @@ func newS3Poller(log *logp.Logger,
states *states,
store *statestore.Store,
bucket string,
awsRegion string,
numberOfWorkers int,
bucketPollInterval time.Duration) *s3Poller {
if metrics == nil {
Expand All @@ -68,6 +70,7 @@ func newS3Poller(log *logp.Logger,
return &s3Poller{
numberOfWorkers: numberOfWorkers,
bucket: bucket,
region: awsRegion,
bucketPollInterval: bucketPollInterval,
workerSem: newSem(numberOfWorkers),
s3: s3,
Expand Down Expand Up @@ -178,7 +181,9 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-
p.states.Update(state, "")

event := s3EventV2{}
event.AWSRegion = p.region
event.S3.Bucket.Name = bucketName
event.S3.Bucket.ARN = p.bucket
event.S3.Object.Key = filename

acker := newEventACKTracker(ctx)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (f *s3ObjectProcessorFactory) findReaderConfig(key string) *readerConfig {
// match the S3 object key.
func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *eventACKTracker, obj s3EventV2) s3ObjectHandler {
log = log.With(
"s3_bucket", obj.S3.Bucket.Name,
"s3_object", obj.S3.Object.Key)
"bucket_arn", obj.S3.Bucket.Name,
"object_key", obj.S3.Object.Key)

readerConfig := f.findReaderConfig(obj.S3.Object.Key)
if readerConfig == nil {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestS3Poller(t *testing.T) {
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, numberOfWorkers, pollInterval)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "region", numberOfWorkers, pollInterval)
require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx))
assert.Equal(t, numberOfWorkers, receiver.workerSem.available)
})
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestS3Poller(t *testing.T) {
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, numberOfWorkers, pollInterval)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "region", numberOfWorkers, pollInterval)
require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx))
assert.Equal(t, numberOfWorkers, receiver.workerSem.available)
})
Expand Down
Loading

0 comments on commit 56156d9

Please sign in to comment.