Skip to content

Commit

Permalink
[Filebeat] Add option for S3 input to work without SQS notification (#…
Browse files Browse the repository at this point in the history
…27332)

* [Filebeat] Add option for S3 input to work without SQS notification

(cherry picked from commit 928f9c5)
  • Loading branch information
Andrea Spacca authored and mergify-bot committed Aug 17, 2021
1 parent 2badab2 commit 29cdd57
Show file tree
Hide file tree
Showing 38 changed files with 2,798 additions and 655 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for GMT timezone offsets in `decode_cef`. {pull}20993[20993]
- Release Filebeat Stack Monitoring modules as GA {pull}26226[26226]
- Remove all alias fields pointing to ECS fields from modules. This affects the Suricata and Traefik modules. {issue}10535[10535] {pull}26627[26627]
- Add option for S3 input to work without SQS notification {issue}18205[18205] {pull}27332[27332]

*Heartbeat*

Expand Down
44 changes: 40 additions & 4 deletions filebeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ This file is generated! See scripts/docs_collector.py
beta[]

This is a module for aws logs. It uses filebeat s3 input to get log files from
AWS S3 buckets with SQS notification. This module supports reading s3 server
access logs with `s3access` fileset, ELB access logs with `elb` fileset, VPC
flow logs with `vpcflow` fileset, and CloudTrail logs with `cloudtrail` fileset.
AWS S3 buckets with SQS notification or directly polling list of S3 objects in an S3 bucket.
The use of SQS notification is preferred: polling list of S3 objects is expensive
in terms of performance and costs, and cannot scale horizontally without ingestion duplication,
and should be preferably used only when no SQS notification can be attached to the S3 buckets.

This module supports reading S3 server access logs with `s3access` fileset,
ELB access logs with `elb` fileset, VPC flow logs with `vpcflow` fileset,
and CloudTrail logs with `cloudtrail` fileset.

Access logs contain detailed information about the requests made to these
services. VPC flow logs captures information about the IP traffic going to and
Expand All @@ -44,6 +49,9 @@ Example config:
cloudtrail:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -58,6 +66,9 @@ Example config:
cloudwatch:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -72,6 +83,9 @@ Example config:
ec2:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -86,6 +100,9 @@ Example config:
elb:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -100,6 +117,9 @@ Example config:
s3access:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -114,6 +134,9 @@ Example config:
vpcflow:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
#var.credential_profile_name: fb-aws
#var.access_key_id: access_key_id
Expand All @@ -128,7 +151,7 @@ Example config:

*`var.queue_url`*::

(Required) AWS SQS queue url.
AWS SQS queue url (Required when `var.bucket` is not set).

*`var.visibility_timeout`*::

Expand All @@ -139,6 +162,19 @@ Default to be 300 seconds.

Maximum duration before AWS API request will be interrupted. Default to be 120 seconds.

*`var.bucket`*::

AWS S3 bucket ARN (Required when `var.queue_url` is not set).

*`var.number_of_workers`*::

Number of workers that will process the S3 objects listed (Required when `var.bucket` is set).
Use to vertically scale the input.

*`var.bucket_list_interval`*::

Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds.

*`var.endpoint`*::

Custom endpoint used to access AWS APIs.
Expand Down
16 changes: 14 additions & 2 deletions libbeat/publisher/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type TestPublisher struct {

// given channel only.
type ChanClient struct {
done chan struct{}
Channel chan beat.Event
done chan struct{}
Channel chan beat.Event
publishCallback func(event beat.Event)
}

func PublisherWithClient(client beat.Client) beat.Pipeline {
Expand All @@ -44,6 +45,13 @@ func (pub *TestPublisher) ConnectWith(_ beat.ClientConfig) (beat.Client, error)
return pub.client, nil
}

func NewChanClientWithCallback(bufSize int, callback func(event beat.Event)) *ChanClient {
chanClient := NewChanClientWith(make(chan beat.Event, bufSize))
chanClient.publishCallback = callback

return chanClient
}

func NewChanClient(bufSize int) *ChanClient {
return NewChanClientWith(make(chan beat.Event, bufSize))
}
Expand All @@ -70,6 +78,10 @@ func (c *ChanClient) Publish(event beat.Event) {
select {
case <-c.done:
case c.Channel <- event:
if c.publishCallback != nil {
c.publishCallback(event)
<-c.Channel
}
}
}

Expand Down
81 changes: 73 additions & 8 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@
++++

Use the `aws-s3` input to retrieve logs from S3 objects that are pointed to by
S3 notification events read from an SQS queue. This input can, for example, be
S3 notification events read from an SQS queue or directly polling list of S3 objects in an S3 bucket.
The use of SQS notification is preferred: polling list of S3 objects is expensive
in terms of performance and costs and should be preferably used only when no SQS
notification can be attached to the S3 buckets. This input can, for example, be
used to receive S3 access logs to monitor detailed records for the requests that
are made to a bucket.

This input depends on S3 notifications delivered to an SQS queue for
`s3:ObjectCreated:*` events. You must create an SQS queue and configure S3
SQS notification method is enabled setting `queue_url` configuration value.
S3 bucket list polling method is enabled setting `s3_bucket` configuration value.
Both value cannot be set at the same time, at least one of the two value must be set.

When using the SQS notification method this input depends on S3 notifications delivered
to an SQS queue for `s3:ObjectCreated:*` events. You must create an SQS queue and configure S3
to publish events to the queue.

When processing a S3 object which pointed by a SQS message, if half of the set
Expand All @@ -36,6 +43,24 @@ be stopped and the SQS message will be returned back to the queue.
expand_event_list_from_field: Records
----


When using the direct polling list of S3 objects in an S3 buckets,
a number of workers that will process the S3 objects listed must be set
through the `number_of_workers` config.
Listing of the S3 bucket will be polled according the time interval defined by
`bucket_list_interval` config. Default value is 120secs.

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: aws-s3
s3_bucket: arn:aws:s3:::test-s3-bucket
number_of_workers: 5
bucket_list_interval: 300s
credential_profile_name: elastic-beats
expand_event_list_from_field: Records
----

The `aws-s3` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

Expand Down Expand Up @@ -211,7 +236,7 @@ configuring multiline options.
[float]
==== `queue_url`

URL of the AWS SQS queue that messages will be received from. Required.
URL of the AWS SQS queue that messages will be received from. (Required when `s3_bucket` is not set).

[float]
==== `visibility_timeout`
Expand Down Expand Up @@ -242,6 +267,24 @@ The maximum duration that an SQS `ReceiveMessage` call should wait for a message
to arrive in the queue before returning. The default value is `20s`. The maximum
value is `20s`.

[float]
==== `s3_bucket`

ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` is not set).

[float]
==== `bucket_list_interval`

Time interval for polling listing of the S3 bucket: default to `120s`.


[float]
==== `number_of_workers`

Number of workers that will process the S3 objects listed. (Required when `s3_bucket` is set).



[float]
==== `aws credentials`

Expand All @@ -251,7 +294,8 @@ see <<aws-credentials-config,AWS credentials options>> for more details.
[float]
=== AWS Permissions

Specific AWS permissions are required for IAM user to access SQS and S3:
Specific AWS permissions are required for IAM user to access SQS and S3
when using the SQS notifications method:

----
s3:GetObject
Expand All @@ -260,6 +304,14 @@ sqs:ChangeMessageVisibility
sqs:DeleteMessage
----

Reduced specific AWS permissions are required for IAM user to access S3
when using the polling list of S3 bucket objects:

----
s3:GetObject
s3:ListBucket
----

[float]
=== S3 and SQS setup

Expand All @@ -271,7 +323,7 @@ for more details.
[float]
=== Parallel Processing

Multiple Filebeat instances can read from the same SQS queues at the same time.
When using the SQS notifications method, multiple {beatname_uc} instances can read from the same SQS queues at the same time.
To horizontally scale processing when there are large amounts of log data
flowing into an S3 bucket, you can run multiple {beatname_uc} instances that
read from the same SQS queues at the same time. No additional configuration is
Expand All @@ -282,15 +334,25 @@ when multiple {beatname_uc} instances are running in parallel. To prevent
{beatname_uc} from receiving and processing the message more than once, set the
visibility timeout.

The visibility timeout begins when SQS returns a message to Filebeat. During
this time, Filebeat processes and deletes the message. However, if Filebeat
The visibility timeout begins when SQS returns a message to {beatname_uc}. During
this time, {beatname_uc} processes and deletes the message. However, if {beatname_uc}
fails before deleting the message and your system doesn't call the DeleteMessage
action for that message before the visibility timeout expires, the message
becomes visible to other {beatname_uc} instances, and the message is received
again. By default, the visibility timeout is set to 5 minutes for aws-s3 input
in {beatname_uc}. 5 minutes is sufficient time for {beatname_uc} to read SQS
messages and process related s3 log files.

When using the polling list of S3 bucket objects method be aware that if running multiple {beatname_uc} instances,
they can list the same S3 bucket at the same time. Since the state of the ingested S3 objects is persisted
(upon processing every page of the listing operation) in the `path.data` configuration
and multiple {beatname_uc} cannot share the same `path.data` this will produce repeated
ingestion of the S3 object.
Therefore, when using the polling list of S3 bucket objects method, scaling should be
vertical, with a single bigger {beatname_uc} instance and higher `number_of_workers`
config value.


[float]
=== Metrics

Expand All @@ -308,6 +370,9 @@ observe the activity of the input.
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `s3_objects_requested_total` | Number of S3 objects downloaded.
| `s3_objects_listed_total` | Number of S3 objects returned by list operations.
| `s3_objects_processed_total` | Number of S3 objects that matched file_selectors rules.
| `s3_objects_acked_total` | Number of S3 objects processed that were fully ACKed.
| `s3_bytes_processed_total` | Number of S3 bytes processed.
| `s3_events_created_total` | Number of events created from processing S3 data.
| `s3_objects_inflight_gauge` | Number of S3 objects inflight (gauge).
Expand Down
10 changes: 6 additions & 4 deletions x-pack/filebeat/input/awss3/_meta/terraform/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ before running Terraform or the integration tests. The AWS key must be
authorized to create and destroy S3 buckets and SQS queues.

1. Execute terraform in this directory to create the resources. This will also
write the `outputs.yml`.
write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order
to match the AWS region of the profile you are using.

`terraform apply`


2. (Optional) View the output configuration.

```yaml
Expand All @@ -28,14 +30,14 @@ write the `outputs.yml`.
"queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-8iok1h"
```
2. Execute the integration test.
4. Execute the integration test.
```
cd x-pack/filebeat/inputs/awss3
go test -tags aws,integration -run TestInputRun -v .
go test -tags aws,integration -run TestInputRun.+ -v .
```

3. Cleanup AWS resources. Execute terraform to remove the SQS queue and delete
5. Cleanup AWS resources. Execute terraform to remove the SQS queue and delete
the S3 bucket and its contents.

`terraform destroy`
Expand Down
8 changes: 6 additions & 2 deletions x-pack/filebeat/input/awss3/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func newEventACKTracker(ctx context.Context) *eventACKTracker {
return &eventACKTracker{ctx: ctx, cancel: cancel}
}

// Add increments the number of pending ACKs by the specified amount.
func (a *eventACKTracker) Add(messageCount int64) {
// Add increments the number of pending ACKs.
func (a *eventACKTracker) Add() {
a.Lock()
a.pendingACKs++
a.Unlock()
Expand All @@ -51,6 +51,10 @@ func (a *eventACKTracker) ACK() {
}

// Wait waits for the number of pending ACKs to be zero.
// Wait must be called sequentially only after every expected
// Add call are made. Failing to do so could reset the pendingACKs
// property to 0 and would results in Wait returning after additional
// calls to `Add` are made without a corresponding `ACK` call.
func (a *eventACKTracker) Wait() {
// If there were never any pending ACKs then cancel the context. (This can
// happen when a document contains no events or cannot be read due to an error).
Expand Down
19 changes: 17 additions & 2 deletions x-pack/filebeat/input/awss3/acker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestEventACKTracker(t *testing.T) {
t.Cleanup(cancel)

acker := newEventACKTracker(ctx)
acker.Add(1)
acker.Add()
acker.ACK()

assert.EqualValues(t, 0, acker.pendingACKs)
Expand All @@ -42,7 +42,7 @@ func TestEventACKHandler(t *testing.T) {

// Create acker. Add one pending ACK.
acker := newEventACKTracker(ctx)
acker.Add(1)
acker.Add()

// Create an ACK handler and simulate one ACKed event.
ackHandler := newEventACKHandler()
Expand All @@ -52,3 +52,18 @@ func TestEventACKHandler(t *testing.T) {
assert.EqualValues(t, 0, acker.pendingACKs)
assert.ErrorIs(t, acker.ctx.Err(), context.Canceled)
}

func TestEventACKHandlerWait(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// Create acker. Add one pending ACK.
acker := newEventACKTracker(ctx)
acker.Add()
acker.ACK()
acker.Wait()
acker.Add()

assert.EqualValues(t, 1, acker.pendingACKs)
assert.ErrorIs(t, acker.ctx.Err(), context.Canceled)
}
Loading

0 comments on commit 29cdd57

Please sign in to comment.