Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add option for S3 input to work without SQS notification #27332

Merged
merged 21 commits into from
Aug 17, 2021
Merged
42 changes: 24 additions & 18 deletions filebeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ Example config:
cloudtrail:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.s3_bucket: 'arn:aws:s3:::mybucket
#var.s3_bucket_poll_interval: 300s
#var.bucket: 'arn:aws:s3:::mybucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#var.bucket: 'arn:aws:s3:::mybucket
#var.bucket: 'arn:aws:s3:::mybucket'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and same for the other sections below.

#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -65,8 +66,9 @@ Example config:
cloudwatch:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.s3_bucket: 'arn:aws:s3:::mybucket
#var.s3_bucket_poll_interval: 300s
#var.bucket: 'arn:aws:s3:::mybucket
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -81,8 +83,9 @@ Example config:
ec2:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.s3_bucket: 'arn:aws:s3:::mybucket
#var.s3_bucket_poll_interval: 300s
#var.bucket: 'arn:aws:s3:::mybucket
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -97,8 +100,9 @@ Example config:
elb:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.s3_bucket: 'arn:aws:s3:::mybucket
#var.s3_bucket_poll_interval: 300s
#var.bucket: 'arn:aws:s3:::mybucket
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -113,8 +117,9 @@ Example config:
s3access:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.s3_bucket: 'arn:aws:s3:::mybucket
#var.s3_bucket_poll_interval: 300s
#var.bucket: 'arn:aws:s3:::mybucket
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -129,8 +134,9 @@ Example config:
vpcflow:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.s3_bucket: 'arn:aws:s3:::mybucket
#var.s3_bucket_poll_interval: 300s
#var.bucket: 'arn:aws:s3:::mybucket
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -145,7 +151,7 @@ Example config:

*`var.queue_url`*::

AWS SQS queue url (Required when `var.s3_bucket` is not set).
AWS SQS queue url (Required when `var.bucket` is not set).

*`var.visibility_timeout`*::

Expand All @@ -156,18 +162,18 @@ Default to be 300 seconds.

Maximum duration before AWS API request will be interrupted. Default to be 120 seconds.

*`var.s3_bucket`*::
*`var.bucket`*::

AWS S3 bucket ARN (Required when `var.queue_url` is not set).

*`var.s3_bucket_number_of_workers`*::
*`var.number_of_workers`*::

Number of workers that will process the S3 objects listed (Required when `var.s3_bucket` is set).
Number of workers that will process the S3 objects listed (Required when `var.bucket` is set).
Use to vertically scale the input.

*`var.s3_bucket_poll_interval`*::
*`var.bucket_list_interval`*::

Interval between list requests to the S3 bucket. Default to be 120 seconds.
Wait interval between completion of a list request to the S3 bucket and beginning of the nest one. Default to be 120 seconds.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Wait interval between completion of a list request to the S3 bucket and beginning of the nest one. Default to be 120 seconds.
Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds.


*`var.endpoint`*::

Expand Down
17 changes: 10 additions & 7 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ be stopped and the SQS message will be returned back to the queue.

When using the direct polling list of S3 objects in an S3 buckets,
a number of workers that will process the S3 objects listed must be set
through the `s3_bucket_number_of_workers` config.
through the `number_of_workers` config.
Listing of the S3 bucket will be polled according the time interval defined by
`s3_bucket_poll_interval` config. Default value is 120secs.
`bucket_list_interval` config. Default value is 120secs.

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: aws-s3
s3_bucket: arn:aws:s3:::test-s3-bucket
s3_bucket_number_of_workers: 5
s3_bucket_poll_interval: 300s
number_of_workers: 5
bucket_list_interval: 300s
credential_profile_name: elastic-beats
expand_event_list_from_field: Records
----
Expand Down Expand Up @@ -273,13 +273,13 @@ value is `20s`.
ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` is not set).

[float]
==== `s3_bucket_poll_interval`
==== `bucket_list_interval`

Time interval for polling listing of the S3 bucket: default to `120s`.


[float]
==== `s3_bucket_number_of_workers`
==== `number_of_workers`

Number of workers that will process the S3 objects listed. (Required when `s3_bucket` is set).

Expand Down Expand Up @@ -349,7 +349,7 @@ they can list the same S3 bucket at the same time. Since the state of the ingest
and multiple {beatname_uc} cannot share the same `path.data` this will produce repeated
ingestion of the S3 object.
Therefore, when using the polling list of S3 bucket objects method, scaling should be
vertical, with a single bigger {beatname_uc} instance and higher `s3_bucket_number_of_workers`
vertical, with a single bigger {beatname_uc} instance and higher `number_of_workers`
config value.


Expand All @@ -370,6 +370,9 @@ observe the activity of the input.
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `s3_objects_requested_total` | Number of S3 objects downloaded.
| `s3_objects_listed_total` | Number of S3 objects returned by list operations.
| `s3_objects_processed_total` | Number of S3 objects that matched file_selectors rules.
| `s3_objects_acked_total` | Number of S3 objects processed that were fully ACKed.
| `s3_bytes_processed_total` | Number of S3 bytes processed.
| `s3_events_created_total` | Number of events created from processing S3 data.
| `s3_objects_inflight_gauge` | Number of S3 objects inflight (gauge).
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/_meta/terraform/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ before running Terraform or the integration tests. The AWS key must be
authorized to create and destroy S3 buckets and SQS queues.

1. Execute terraform in this directory to create the resources. This will also
write the `outputs.yml`.
write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order
to match the AWS region of the profile you are using.

`terraform apply`


2. View the output configuration and assure the region match in the aws profile used to run
the test or to set the environment variable `AWS_REGION` to the value in the output.
2. (Optional) View the output configuration.

```yaml
"aws_region": "us-east-1"
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/awss3/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newEventACKTracker(ctx context.Context) *eventACKTracker {
return &eventACKTracker{ctx: ctx, cancel: cancel}
}

// Add increments the number of pending ACKs
// Add increments the number of pending ACKs.
func (a *eventACKTracker) Add() {
a.Lock()
a.pendingACKs++
Expand All @@ -51,6 +51,10 @@ func (a *eventACKTracker) ACK() {
}

// Wait waits for the number of pending ACKs to be zero.
// Wait must be called sequentially only after every expected
// Add call are made. Failing to do so could reset the pendingACKs
// property to 0 and would results in Wait returning after additional
// calls to `Add` are made without a corresponding `ACK` call.
func (a *eventACKTracker) Wait() {
// If there were never any pending ACKs then cancel the context. (This can
// happen when a document contains no events or cannot be read due to an error).
Expand Down
15 changes: 15 additions & 0 deletions x-pack/filebeat/input/awss3/acker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,18 @@ func TestEventACKHandler(t *testing.T) {
assert.EqualValues(t, 0, acker.pendingACKs)
assert.ErrorIs(t, acker.ctx.Err(), context.Canceled)
}

func TestEventACKHandlerWait(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// Create acker. Add one pending ACK.
acker := newEventACKTracker(ctx)
acker.Add()
acker.ACK()
acker.Wait()
acker.Add()

assert.EqualValues(t, 1, acker.pendingACKs)
assert.ErrorIs(t, acker.ctx.Err(), context.Canceled)
}
59 changes: 29 additions & 30 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,51 @@ import (
)

type config struct {
APITimeout time.Duration `config:"api_timeout"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning.
SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it.
FIPSEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url"`
S3Bucket string `config:"s3_bucket"`
S3BucketPollInterval time.Duration `config:"s3_bucket_poll_interval"`
S3BucketNumberOfWorkers int `config:"s3_bucket_number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used.
APITimeout time.Duration `config:"api_timeout"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning.
SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it.
FIPSEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url"`
Bucket string `config:"bucket"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used.
}

func defaultConfig() config {
c := config{
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
S3BucketPollInterval: 120 * time.Second,
SQSWaitTime: 20 * time.Second,
SQSMaxReceiveCount: 5,
FIPSEnabled: false,
MaxNumberOfMessages: 5,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
BucketListInterval: 120 * time.Second,
SQSWaitTime: 20 * time.Second,
SQSMaxReceiveCount: 5,
FIPSEnabled: false,
MaxNumberOfMessages: 5,
}
c.ReaderConfig.InitDefaults()
return c
}

func (c *config) Validate() error {
if c.QueueURL == "" && c.S3Bucket == "" {
return fmt.Errorf("queue_url or s3_bucket must provided")
if c.QueueURL == "" && c.Bucket == "" {
return fmt.Errorf("queue_url or bucket must provided")
}

if c.QueueURL != "" && c.S3Bucket != "" {
return fmt.Errorf("queue_url <%v> and s3_bucket <%v> "+
"cannot be set at the same time", c.QueueURL, c.S3Bucket)
if c.QueueURL != "" && c.Bucket != "" {
return fmt.Errorf("queue_url <%v> and bucket <%v> "+
"cannot be set at the same time", c.QueueURL, c.Bucket)
}

if c.S3Bucket != "" && (c.S3BucketPollInterval <= 0 || c.S3BucketPollInterval.Hours() > 12) {
return fmt.Errorf("s3_bucket_poll_interval <%v> must be greater than 0 and "+
"less than or equal to 12h", c.S3BucketPollInterval)
if c.Bucket != "" && c.BucketListInterval <= 0 {
return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval)
}

if c.S3Bucket != "" && c.S3BucketNumberOfWorkers <= 0 {
return fmt.Errorf("s3_bucket_number_of_workers <%v> must be greater than 0", c.S3BucketNumberOfWorkers)
if c.Bucket != "" && c.NumberOfWorkers <= 0 {
return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers)
}

if c.QueueURL != "" && (c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12) {
Expand Down
Loading