Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding aws-s3 metric for approximate messages waiting #34488

Merged
merged 30 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
31839b7
added metric for polling the approximate message count from sqs
kgeller Feb 6, 2023
20c977d
documentation and rename
kgeller Feb 6, 2023
4472dfe
changelog
kgeller Feb 6, 2023
b8e8eac
Merge branch 'main' of https://github.com/elastic/beats into s3-messa…
kgeller Feb 6, 2023
f56a88e
initializing info for benchmarking
kgeller Feb 6, 2023
4a379f0
more foundational test pieces
kgeller Feb 6, 2023
a428d70
update ticker logic to stop on context cancelled
kgeller Feb 7, 2023
0856bf1
documentation
kgeller Feb 7, 2023
f786f5c
mockgen generation
kgeller Feb 7, 2023
2a7afd5
stop polling on permissions error
kgeller Feb 7, 2023
4e4d0f1
Merge branch 'main' of https://github.com/elastic/beats into s3-messa…
kgeller Feb 7, 2023
ffa7bbf
back to just -1, simpler
kgeller Feb 7, 2023
e651cbf
tests
kgeller Feb 7, 2023
7762df2
cr tweaks
kgeller Feb 9, 2023
820a826
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 9, 2023
94393da
updated description
kgeller Feb 9, 2023
b13ec82
adding in flag for permissions
kgeller Feb 10, 2023
83d585a
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 10, 2023
afb9e89
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 21, 2023
057982d
remove bool and update to initialize after success
kgeller Feb 21, 2023
13ff657
assorted updates; checking for 403, review cleanup
kgeller Feb 22, 2023
69691be
Merge branch 'main' of https://github.com/elastic/beats into s3-messa…
kgeller Feb 22, 2023
9294ef8
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 23, 2023
eb7dc99
fixing auth check
kgeller Feb 24, 2023
e3eda2b
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 24, 2023
3432255
removing do once
kgeller Feb 27, 2023
8211510
Merge branch 'main' of https://github.com/elastic/beats into s3-messa…
kgeller Feb 27, 2023
1305077
Merge branch 's3-messages-waiting-metric' of github.com:kgeller/beats…
kgeller Feb 27, 2023
673f7a0
reworked logic
kgeller Feb 27, 2023
a98eb74
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add pagination support for Salesforce module. {issue}34057[34057] {pull}34065[34065]
- Allow users to redact sensitive data from CEL input debug logs. {pull}34302[34302]
- Added support for HTTP destination override to Google Cloud Storage input. {pull}34413[34413]
- Added metric `sqs_messages_waiting_gauge` for aws-s3 input. {pull}34488[34488]
- Add support for new Rabbitmq timestamp format for logs {pull}34211[34211]
- Allow user configuration of timezone offset in Cisco ASA and FTD modules. {pull}34436[34436]
- Allow user configuration of timezone offset in Checkpoint module. {pull}34472[34472]
Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ In case `delete_after_backup` is set the following permission is required as wel
s3:DeleteObject
----

In case optional SQS metric `sqs_messages_waiting_gauge` is desired, the following permission is required:
----
sqs:GetQueueAttributes
----

[float]
=== S3 and SQS setup

Expand Down Expand Up @@ -732,6 +737,7 @@ observe the activity of the input.
| `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge).
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
| `s3_objects_requested_total` | Number of S3 objects downloaded.
Expand Down
36 changes: 35 additions & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ package awss3

import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/smithy-go"

"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand All @@ -24,7 +27,10 @@ import (
"github.com/elastic/go-concert/unison"
)

const inputName = "aws-s3"
const (
inputName = "aws-s3"
sqsAccessDeniedErrorCode = "AccessDeniedException"
)

func Plugin(store beater.StateStore) v2.Plugin {
return v2.Plugin{
Expand Down Expand Up @@ -123,6 +129,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
defer receiver.metrics.Close()

// Poll sqs waiting metric periodically in the background.
go pollSqsWaitingMetric(ctx, receiver)

if err := receiver.Receive(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -376,5 +385,30 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string {
return "unknown"
}

func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) {
t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
count, err := receiver.GetApproximateMessageCount(ctx)

var apiError smithy.APIError
if errors.As(err, &apiError) {
switch apiError.ErrorCode() {
case sqsAccessDeniedErrorCode:
// stop polling if auth error is encountered
receiver.metrics.setSQSMessagesWaiting(int64(count))
return
}
}

receiver.metrics.setSQSMessagesWaiting(int64(count))
}
}
}

// boolPtr returns a pointer to b.
func boolPtr(b bool) *bool { return &b }
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqsTypes.M
return nil
}

func (c *constantSQS) GetQueueAttributes(ctx context.Context, attr []sqsTypes.QueueAttributeName) (map[string]string, error) {
return map[string]string{}, nil
}

type s3PagerConstant struct {
mutex *sync.Mutex
objects []s3Types.Object
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
Expand Down Expand Up @@ -424,6 +425,7 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
Expand Down
24 changes: 24 additions & 0 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type sqsAPI interface {
sqsReceiver
sqsDeleter
sqsVisibilityChanger
sqsAttributeGetter
}

type sqsReceiver interface {
Expand All @@ -55,6 +56,10 @@ type sqsVisibilityChanger interface {
ChangeMessageVisibility(ctx context.Context, msg *types.Message, timeout time.Duration) error
}

type sqsAttributeGetter interface {
GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error)
}

type sqsProcessor interface {
// ProcessSQS processes and SQS message. It takes fully ownership of the
// given message and is responsible for updating the message's visibility
Expand Down Expand Up @@ -197,6 +202,25 @@ func (a *awsSQSAPI) ChangeMessageVisibility(ctx context.Context, msg *types.Mess
return nil
}

func (a *awsSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) {
ctx, cancel := context.WithTimeout(ctx, a.apiTimeout)
defer cancel()

attributeOutput, err := a.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
AttributeNames: attr,
QueueUrl: awssdk.String(a.queueURL),
})

if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("api_timeout exceeded: %w", err)
}
return nil, fmt.Errorf("sqs GetQueueAttributes failed: %w", err)
}

return attributeOutput.Attributes, nil
}

// ------
// AWS S3 implementation
// ------
Expand Down
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
)

type inputMetrics struct {
registry *monitoring.Registry
unregister func()

sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge).
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.

Expand All @@ -40,10 +42,21 @@ func (m *inputMetrics) Close() {
m.unregister()
}

func (m *inputMetrics) setSQSMessagesWaiting(count int64) {
if count == -1 && m.sqsMessagesWaiting == nil {
kgeller marked this conversation as resolved.
Show resolved Hide resolved
// if metric not initialized, and count is -1, do nothing
return
}

m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge")
kgeller marked this conversation as resolved.
Show resolved Hide resolved
m.sqsMessagesWaiting.Set(count)
}

func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent)

out := &inputMetrics{
registry: reg,
unregister: unreg,
sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"),
sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"),
Expand Down
Loading