diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index df3292bce2d..052322fc57a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -844,6 +844,7 @@ field. You can revert this change by configuring tags for the module and omittin - Move Prometheus query & remote_write to GA. {pull}21507[21507] - Map cloud data filed `cloud.account.id` to azure subscription. {pull}21483[21483] {issue}21381[21381] - Expand unsupported option from namespace to metrics in the azure module. {pull}21486[21486] +- Move s3_daily_storage and s3_request metricsets to use cloudwatch input. {pull}21703[21703] *Packetbeat* diff --git a/metricbeat/docs/modules/aws/s3_daily_storage.asciidoc b/metricbeat/docs/modules/aws/s3_daily_storage.asciidoc index 251fe923231..7fd8a635a3b 100644 --- a/metricbeat/docs/modules/aws/s3_daily_storage.asciidoc +++ b/metricbeat/docs/modules/aws/s3_daily_storage.asciidoc @@ -8,6 +8,7 @@ This file is generated! See scripts/mage/docs_collector.go include::../../../../x-pack/metricbeat/module/aws/s3_daily_storage/_meta/docs.asciidoc[] +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. ==== Fields diff --git a/metricbeat/docs/modules/aws/s3_request.asciidoc b/metricbeat/docs/modules/aws/s3_request.asciidoc index 53bef698894..e8d26703b17 100644 --- a/metricbeat/docs/modules/aws/s3_request.asciidoc +++ b/metricbeat/docs/modules/aws/s3_request.asciidoc @@ -8,6 +8,7 @@ This file is generated! See scripts/mage/docs_collector.go include::../../../../x-pack/metricbeat/module/aws/s3_request/_meta/docs.asciidoc[] +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. ==== Fields diff --git a/x-pack/metricbeat/include/list.go b/x-pack/metricbeat/include/list.go index 053385e2e79..e27c7f9a624 100644 --- a/x-pack/metricbeat/include/list.go +++ b/x-pack/metricbeat/include/list.go @@ -16,8 +16,6 @@ import ( _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/ec2" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/rds" - _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/s3_daily_storage" - _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/s3_request" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/sqs" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/azure" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/azure/app_insights" diff --git a/x-pack/metricbeat/module/aws/module.yml b/x-pack/metricbeat/module/aws/module.yml index 0be20d1d484..c6129426e8b 100644 --- a/x-pack/metricbeat/module/aws/module.yml +++ b/x-pack/metricbeat/module/aws/module.yml @@ -4,6 +4,8 @@ metricsets: - ebs - usage - sns + - s3_daily_storage + - s3_request - lambda - dynamodb - vpn diff --git a/x-pack/metricbeat/module/aws/mtest/integration.go b/x-pack/metricbeat/module/aws/mtest/integration.go index f99d34bb05f..fba6b9fe3b2 100644 --- a/x-pack/metricbeat/module/aws/mtest/integration.go +++ b/x-pack/metricbeat/module/aws/mtest/integration.go @@ -39,6 +39,7 @@ func GetConfigForTest(t *testing.T, metricSetName string, period string) map[str "access_key_id": accessKeyID, "secret_access_key": secretAccessKey, "default_region": defaultRegion, + "latency": "5m", // You can specify which region to run test on by using regions variable // "regions": []string{"us-east-1"}, } diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/_meta/data.json b/x-pack/metricbeat/module/aws/s3_daily_storage/_meta/data.json index 722318a2981..4285889e5d4 100644 --- a/x-pack/metricbeat/module/aws/s3_daily_storage/_meta/data.json +++ b/x-pack/metricbeat/module/aws/s3_daily_storage/_meta/data.json @@ -1,27 +1,28 @@ { "@timestamp": "2017-10-12T08:05:34.853Z", "aws": { - "s3": { - "bucket": { - "name": "test-s3-ks-2" - } + "cloudwatch": { + "namespace": "AWS/S3" + }, + "dimensions": { + "BucketName": "filebeat-aws-elb-test", + "StorageType": "AllStorageTypes" }, - "s3_daily_storage": { - "bucket": { - "size": { - "bytes": 207372 + "s3": { + "metrics": { + "NumberOfObjects": { + "avg": 57828 } - }, - "number_of_objects": 128 + } } }, "cloud": { "account": { - "id": "627959692251", - "name": "elastic-test" + "id": "428152502467", + "name": "elastic-beats" }, "provider": "aws", - "region": "ap-southeast-1" + "region": "eu-central-1" }, "event": { "dataset": "aws.s3_daily_storage", diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/data.go b/x-pack/metricbeat/module/aws/s3_daily_storage/data.go deleted file mode 100644 index 95b5b3cf53d..00000000000 --- a/x-pack/metricbeat/module/aws/s3_daily_storage/data.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package s3_daily_storage - -import ( - s "github.com/elastic/beats/v7/libbeat/common/schema" - c "github.com/elastic/beats/v7/libbeat/common/schema/mapstrstr" -) - -var ( - schemaMetricSetFields = s.Schema{ - "bucket": s.Object{ - "size": s.Object{ - "bytes": c.Float("BucketSizeBytes"), - }, - }, - "number_of_objects": c.Float("NumberOfObjects"), - } -) diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/manifest.yml b/x-pack/metricbeat/module/aws/s3_daily_storage/manifest.yml new file mode 100644 index 00000000000..83ed42545c0 --- /dev/null +++ b/x-pack/metricbeat/module/aws/s3_daily_storage/manifest.yml @@ -0,0 +1,22 @@ +default: true +input: + module: aws + metricset: cloudwatch + defaults: + metrics: + - namespace: AWS/S3 + statistic: ["Average"] + name: + - BucketSizeBytes + - NumberOfObjects + +processors: + - rename: + ignore_missing: true + fields: + - from: "aws.s3.metrics.NumberOfObjects.avg" + to: "aws.s3_daily_storage.number_of_objects" + - from: "aws.s3.metrics.BucketSizeBytes.avg" + to: "aws.s3_daily_storage.bucket.size.bytes" + - from: "aws.dimensions.BucketName" + to: "aws.s3.bucket.name" diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go deleted file mode 100644 index d5efa36fb03..00000000000 --- a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package s3_daily_storage - -import ( - "fmt" - "strconv" - "strings" - "time" - - "github.com/aws/aws-sdk-go-v2/service/cloudwatch" - "github.com/pkg/errors" - - "github.com/elastic/beats/v7/metricbeat/mb" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" -) - -var metricsetName = "s3_daily_storage" - -// init registers the MetricSet with the central registry as soon as the program -// starts. The New function will be called later to instantiate an instance of -// the MetricSet for each host defined in the module's configuration. After the -// MetricSet has been created then Fetch will begin to be called periodically. -func init() { - mb.Registry.MustAddMetricSet(aws.ModuleName, metricsetName, New) -} - -// MetricSet holds any configuration or state information. It must implement -// the mb.MetricSet interface. And this is best achieved by embedding -// mb.BaseMetricSet because it implements all of the required mb.MetricSet -// interface methods except for Fetch. -type MetricSet struct { - *aws.MetricSet -} - -// New creates a new instance of the MetricSet. New is responsible for unpacking -// any MetricSet specific configuration options if there are any. -func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - moduleConfig := aws.Config{} - if err := base.Module().UnpackConfig(&moduleConfig); err != nil { - return nil, err - } - - metricSet, err := aws.NewMetricSet(base) - if err != nil { - return nil, errors.Wrap(err, "error creating aws metricset") - } - - // Check if period is set to be multiple of 86400s - remainder := int(metricSet.Period.Seconds()) % 86400 - if remainder != 0 { - err := errors.New("period needs to be set to 86400s (or a multiple of 86400s). " + - "To avoid data missing or extra costs, please make sure period is set correctly " + - "in config.yml") - base.Logger().Info(err) - } - - return &MetricSet{ - MetricSet: metricSet, - }, nil -} - -// Fetch methods implements the data gathering and data conversion to the right -// format. It publishes the event which is then forwarded to the output. In case -// of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(report mb.ReporterV2) error { - namespace := "AWS/S3" - // Get startTime and endTime - startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency) - m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime) - - // GetMetricData for AWS S3 from Cloudwatch - for _, regionName := range m.MetricSet.RegionsList { - awsConfig := m.MetricSet.AwsConfig.Copy() - awsConfig.Region = regionName - - svcCloudwatch := cloudwatch.New(awscommon.EnrichAWSConfigWithEndpoint( - m.Endpoint, "monitoring", regionName, awsConfig)) - - listMetricsOutputs, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch) - if err != nil { - err = errors.Wrap(err, "GetListMetricsOutput failed, skipping region "+regionName) - m.Logger().Error(err.Error()) - report.Error(err) - continue - } - - if listMetricsOutputs == nil || len(listMetricsOutputs) == 0 { - continue - } - - metricDataQueries := constructMetricQueries(listMetricsOutputs, m.Period) - // Use metricDataQueries to make GetMetricData API calls - metricDataOutputs, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime) - if err != nil { - err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName) - m.Logger().Error(err) - report.Error(err) - continue - } - - // Create Cloudwatch Events for s3_daily_storage - bucketNames := getBucketNames(listMetricsOutputs) - for _, bucketName := range bucketNames { - event, err := createCloudWatchEvents(metricDataOutputs, regionName, bucketName, m.AccountName, m.AccountID) - if err != nil { - err = errors.Wrap(err, "createCloudWatchEvents failed") - m.Logger().Error(err) - event.Error = err - report.Event(event) - continue - } - - if reported := report.Event(event); !reported { - m.Logger().Debug("Fetch interrupted, failed to emit event") - return nil - } - } - } - - return nil -} - -func getBucketNames(listMetricsOutputs []cloudwatch.Metric) (bucketNames []string) { - for _, output := range listMetricsOutputs { - for _, dim := range output.Dimensions { - if *dim.Name == "BucketName" { - if exists, _ := aws.StringInSlice(*dim.Value, bucketNames); exists { - continue - } - bucketNames = append(bucketNames, *dim.Value) - } - } - } - return -} - -func constructMetricQueries(listMetricsOutputs []cloudwatch.Metric, period time.Duration) []cloudwatch.MetricDataQuery { - var metricDataQueries []cloudwatch.MetricDataQuery - metricDataQueryEmpty := cloudwatch.MetricDataQuery{} - metricNames := []string{"NumberOfObjects", "BucketSizeBytes"} - for i, listMetric := range listMetricsOutputs { - if exists, _ := aws.StringInSlice(*listMetric.MetricName, metricNames); !exists { - continue - } - - metricDataQuery := createMetricDataQuery(listMetric, period, i) - if metricDataQuery == metricDataQueryEmpty { - continue - } - metricDataQueries = append(metricDataQueries, metricDataQuery) - } - return metricDataQueries -} - -func createMetricDataQuery(metric cloudwatch.Metric, period time.Duration, index int) (metricDataQuery cloudwatch.MetricDataQuery) { - statistic := "Average" - periodInSec := int64(period.Seconds()) - id := "s3d" + strconv.Itoa(index) - metricDims := metric.Dimensions - bucketName := "" - storageType := "" - for _, dim := range metricDims { - if *dim.Name == "BucketName" { - bucketName = *dim.Value - } else if *dim.Name == "StorageType" { - storageType = *dim.Value - } - } - metricName := *metric.MetricName - label := bucketName + " " + storageType + " " + metricName - - metricDataQuery = cloudwatch.MetricDataQuery{ - Id: &id, - MetricStat: &cloudwatch.MetricStat{ - Period: &periodInSec, - Stat: &statistic, - Metric: &metric, - }, - Label: &label, - } - return -} - -func createCloudWatchEvents(outputs []cloudwatch.MetricDataResult, regionName string, bucketName string, accountName string, accountID string) (event mb.Event, err error) { - // AWS s3_daily_storage metrics - mapOfMetricSetFieldResults := make(map[string]interface{}) - - // Find a timestamp for all metrics in output - timestamp := aws.FindTimestamp(outputs) - if !timestamp.IsZero() { - for _, output := range outputs { - if len(output.Values) == 0 { - continue - } - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - labels := strings.Split(*output.Label, " ") - if labels[0] == bucketName && len(output.Values) > timestampIdx { - mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[timestampIdx]) - } - } - } - } - - resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields) - if err != nil { - err = errors.Wrap(err, "Error trying to apply schema schemaMetricSetFields in AWS s3_daily_storage metricbeat module.") - return - } - - event = aws.InitEvent(regionName, accountName, accountID, timestamp) - event.MetricSetFields = resultMetricSetFields - event.RootFields.Put("aws.s3.bucket.name", bucketName) - return -} diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_integration_test.go b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_integration_test.go index 592416a56a8..a398926b462 100644 --- a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_integration_test.go +++ b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_integration_test.go @@ -28,22 +28,17 @@ func TestFetch(t *testing.T) { assert.NotEmpty(t, events) for _, event := range events { - // RootField - mtest.CheckEventField("service.name", "string", event, t) mtest.CheckEventField("cloud.region", "string", event, t) - - // MetricSetField - mtest.CheckEventField("bucket.name", "string", event, t) - mtest.CheckEventField("bucket.size.bytes", "float", event, t) - mtest.CheckEventField("number_of_objects", "float", event, t) + mtest.CheckEventField("aws.dimensions.BucketName", "string", event, t) + mtest.CheckEventField("aws.dimensions.StorageType", "string", event, t) + mtest.CheckEventField("aws.s3.metrics.BucketSizeBytes.avg", "float", event, t) + mtest.CheckEventField("aws.s3.metrics.NumberOfObjects.avg", "float", event, t) } } func TestData(t *testing.T) { config := mtest.GetConfigForTest(t, "s3_daily_storage", "86400s") - metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - if err := mbtest.WriteEventsReporterV2Error(metricSet, t, "/"); err != nil { - t.Fatal("write", err) - } + metricSet := mbtest.NewFetcher(t, config) + metricSet.WriteEvents(t, "/") } diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_test.go b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_test.go new file mode 100644 index 00000000000..16008223a92 --- /dev/null +++ b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_test.go @@ -0,0 +1,21 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package s3_daily_storage + +import ( + "os" + + "github.com/elastic/beats/v7/metricbeat/mb" + + // Register input module and metricset + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch" +) + +func init() { + // To be moved to some kind of helper + os.Setenv("BEAT_STRICT_PERMS", "false") + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) +} diff --git a/x-pack/metricbeat/module/aws/s3_request/_meta/data.json b/x-pack/metricbeat/module/aws/s3_request/_meta/data.json index d8dfc488a83..8bdda96ef8d 100644 --- a/x-pack/metricbeat/module/aws/s3_request/_meta/data.json +++ b/x-pack/metricbeat/module/aws/s3_request/_meta/data.json @@ -1,40 +1,46 @@ { "@timestamp": "2017-10-12T08:05:34.853Z", "aws": { - "s3": { - "bucket": { - "name": "test-s3-ks-2" - } + "cloudwatch": { + "namespace": "AWS/S3" }, - "s3_request": { - "downloaded": { - "bytes": 534 - }, - "errors": { - "4xx": 0, - "5xx": 0 - }, - "latency": { - "first_byte.ms": 214, - "total_request.ms": 533 - }, - "requests": { - "list": 2, - "put": 10, - "total": 12 - }, - "uploaded": { - "bytes": 13572 + "dimensions": { + "BucketName": "test-ks-1", + "FilterId": "EntireBucket" + }, + "s3": { + "metrics": { + "4xxErrors": { + "avg": 0 + }, + "5xxErrors": { + "avg": 0 + }, + "AllRequests": { + "avg": 1 + }, + "BytesUploaded": { + "avg": 684 + }, + "FirstByteLatency": { + "avg": 21.791666666666668 + }, + "PutRequests": { + "avg": 1 + }, + "TotalRequestLatency": { + "avg": 35.916666666666664 + } } } }, "cloud": { "account": { - "id": "627959692251", - "name": "elastic-test" + "id": "428152502467", + "name": "elastic-beats" }, "provider": "aws", - "region": "ap-southeast-1" + "region": "us-east-1" }, "event": { "dataset": "aws.s3_request", diff --git a/x-pack/metricbeat/module/aws/s3_request/data.go b/x-pack/metricbeat/module/aws/s3_request/data.go deleted file mode 100644 index 5c4c2bfa1a4..00000000000 --- a/x-pack/metricbeat/module/aws/s3_request/data.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package s3_request - -import ( - s "github.com/elastic/beats/v7/libbeat/common/schema" - c "github.com/elastic/beats/v7/libbeat/common/schema/mapstrstr" -) - -var ( - schemaMetricSetFields = s.Schema{ - "requests": s.Object{ - "total": c.Int("AllRequests"), - "get": c.Int("GetRequests"), - "put": c.Int("PutRequests"), - "delete": c.Int("DeleteRequests"), - "head": c.Int("HeadRequests"), - "post": c.Int("PostRequests"), - "select": c.Int("SelectRequests"), - "select_scanned.bytes": c.Float("SelectScannedBytes"), - "select_returned.bytes": c.Float("SelectReturnedBytes"), - "list": c.Int("ListRequests"), - }, - "downloaded": s.Object{ - "bytes": c.Float("BytesDownloaded"), - }, - "uploaded": s.Object{ - "bytes": c.Float("BytesUploaded"), - }, - "errors": s.Object{ - "4xx": c.Int("4xxErrors"), - "5xx": c.Int("5xxErrors"), - }, - "latency": s.Object{ - "first_byte.ms": c.Float("FirstByteLatency"), - "total_request.ms": c.Float("TotalRequestLatency"), - }, - } -) diff --git a/x-pack/metricbeat/module/aws/s3_request/manifest.yml b/x-pack/metricbeat/module/aws/s3_request/manifest.yml new file mode 100644 index 00000000000..6a401fed08f --- /dev/null +++ b/x-pack/metricbeat/module/aws/s3_request/manifest.yml @@ -0,0 +1,64 @@ +default: true +input: + module: aws + metricset: cloudwatch + defaults: + metrics: + - namespace: AWS/S3 + statistic: ["Average"] + name: + - AllRequests + - GetRequests + - PutRequests + - DeleteRequests + - HeadRequests + - PostRequests + - SelectRequests + - SelectScannedBytes + - SelectReturnedBytes + - ListRequests + - BytesDownloaded + - BytesUploaded + - 4xxErrors + - 5xxErrors + - FirstByteLatency + - TotalRequestLatency + +processors: + - rename: + ignore_missing: true + fields: + - from: "aws.s3.metrics.AllRequests.avg" + to: "aws.s3_request.requests.total" + - from: "aws.s3.metrics.GetRequests.avg" + to: "aws.s3_request.requests.get" + - from: "aws.s3.metrics.PutRequests.avg" + to: "aws.s3_request.requests.put" + - from: "aws.s3.metrics.DeleteRequests.avg" + to: "aws.s3_request.requests.delete" + - from: "aws.s3.metrics.HeadRequests.avg" + to: "aws.s3_request.requests.head" + - from: "aws.s3.metrics.PostRequests.avg" + to: "aws.s3_request.requests.post" + - from: "aws.s3.metrics.SelectRequests.avg" + to: "aws.s3_request.requests.select" + - from: "aws.s3.metrics.SelectScannedBytes.avg" + to: "aws.s3_request.requests.select_scanned.bytes" + - from: "aws.s3.metrics.SelectReturnedBytes.avg" + to: "aws.s3_request.requests.select_returned.bytes" + - from: "aws.s3.metrics.ListRequests.avg" + to: "aws.s3_request.requests.list" + - from: "aws.s3.metrics.BytesDownloaded.avg" + to: "aws.s3_request.downloaded.bytes" + - from: "aws.s3.metrics.BytesUploaded.avg" + to: "aws.s3_request.uploaded.bytes" + - from: "aws.s3.metrics.4xxErrors.avg" + to: "aws.s3_request.errors.4xx" + - from: "aws.s3.metrics.5xxErrors.avg" + to: "aws.s3_request.errors.5xx" + - from: "aws.s3.metrics.FirstByteLatency.avg" + to: "aws.s3_request.latency.first_byte.ms" + - from: "aws.s3.metrics.TotalRequestLatency.avg" + to: "aws.s3_request.latency.total_request.ms" + - from: "aws.dimensions.BucketName" + to: "aws.s3.bucket.name" diff --git a/x-pack/metricbeat/module/aws/s3_request/s3_request.go b/x-pack/metricbeat/module/aws/s3_request/s3_request.go deleted file mode 100644 index 00b82827bbf..00000000000 --- a/x-pack/metricbeat/module/aws/s3_request/s3_request.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package s3_request - -import ( - "fmt" - "strconv" - "strings" - "time" - - "github.com/aws/aws-sdk-go-v2/service/cloudwatch" - "github.com/pkg/errors" - - "github.com/elastic/beats/v7/metricbeat/mb" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" -) - -var metricsetName = "s3_request" - -// init registers the MetricSet with the central registry as soon as the program -// starts. The New function will be called later to instantiate an instance of -// the MetricSet for each host defined in the module's configuration. After the -// MetricSet has been created then Fetch will begin to be called periodically. -func init() { - mb.Registry.MustAddMetricSet(aws.ModuleName, metricsetName, New) -} - -// MetricSet holds any configuration or state information. It must implement -// the mb.MetricSet interface. And this is best achieved by embedding -// mb.BaseMetricSet because it implements all of the required mb.MetricSet -// interface methods except for Fetch. -type MetricSet struct { - *aws.MetricSet -} - -// New creates a new instance of the MetricSet. New is responsible for unpacking -// any MetricSet specific configuration options if there are any. -func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - moduleConfig := aws.Config{} - if err := base.Module().UnpackConfig(&moduleConfig); err != nil { - return nil, err - } - - metricSet, err := aws.NewMetricSet(base) - if err != nil { - return nil, errors.Wrap(err, "error creating aws metricset") - } - - // Check if period is set to be multiple of 60s - remainder := int(metricSet.Period.Seconds()) % 60 - if remainder != 0 { - err := errors.New("period needs to be set to 60s (or a multiple of 60s). " + - "To avoid data missing or extra costs, please make sure period is set correctly " + - "in config.yml") - base.Logger().Info(err) - } - - return &MetricSet{ - MetricSet: metricSet, - }, nil -} - -// Fetch methods implements the data gathering and data conversion to the right -// format. It publishes the event which is then forwarded to the output. In case -// of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(report mb.ReporterV2) error { - namespace := "AWS/S3" - // Get startTime and endTime - startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency) - m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime) - - // GetMetricData for AWS S3 from Cloudwatch - for _, regionName := range m.MetricSet.RegionsList { - awsConfig := m.MetricSet.AwsConfig.Copy() - awsConfig.Region = regionName - - svcCloudwatch := cloudwatch.New(awscommon.EnrichAWSConfigWithEndpoint( - m.Endpoint, "monitoring", regionName, awsConfig)) - - listMetricsOutputs, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch) - if err != nil { - m.Logger().Error(err.Error()) - report.Error(err) - continue - } - - if listMetricsOutputs == nil || len(listMetricsOutputs) == 0 { - continue - } - - metricDataQueries := constructMetricQueries(listMetricsOutputs, m.Period) - // This happens when S3 cloudwatch request metrics are not enabled. - if len(metricDataQueries) == 0 { - continue - } - // Use metricDataQueries to make GetMetricData API calls - metricDataOutputs, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime) - if err != nil { - err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName) - m.Logger().Error(err.Error()) - report.Error(err) - continue - } - - // Create Cloudwatch Events for s3_request - bucketNames := getBucketNames(listMetricsOutputs) - for _, bucketName := range bucketNames { - event, err := createS3RequestEvents(metricDataOutputs, regionName, bucketName, m.AccountName, m.AccountID) - if err != nil { - m.Logger().Error(err.Error()) - event.Error = err - report.Event(event) - continue - } - - if reported := report.Event(event); !reported { - m.Logger().Debug("Fetch interrupted, failed to emit event") - return nil - } - } - } - - return nil -} - -func getBucketNames(listMetricsOutputs []cloudwatch.Metric) (bucketNames []string) { - for _, output := range listMetricsOutputs { - for _, dim := range output.Dimensions { - if *dim.Name == "BucketName" { - if exists, _ := aws.StringInSlice(*dim.Value, bucketNames); exists { - continue - } - bucketNames = append(bucketNames, *dim.Value) - } - } - } - return -} - -func createMetricDataQuery(metric cloudwatch.Metric, period time.Duration, index int) (metricDataQuery cloudwatch.MetricDataQuery) { - statistic := "Sum" - periodInSec := int64(period.Seconds()) - id := "s3r" + strconv.Itoa(index) - metricDims := metric.Dimensions - bucketName := "" - filterID := "" - for _, dim := range metricDims { - if *dim.Name == "BucketName" { - bucketName = *dim.Value - } else if *dim.Name == "FilterId" { - filterID = *dim.Value - } - } - metricName := *metric.MetricName - label := bucketName + " " + filterID + " " + metricName - metricDataQuery = cloudwatch.MetricDataQuery{ - Id: &id, - MetricStat: &cloudwatch.MetricStat{ - Period: &periodInSec, - Stat: &statistic, - Metric: &metric, - }, - Label: &label, - } - return -} - -func constructMetricQueries(listMetricsOutputs []cloudwatch.Metric, period time.Duration) []cloudwatch.MetricDataQuery { - var metricDataQueries []cloudwatch.MetricDataQuery - metricDataQueryEmpty := cloudwatch.MetricDataQuery{} - dailyMetricNames := []string{"NumberOfObjects", "BucketSizeBytes"} - for i, listMetric := range listMetricsOutputs { - if exists, _ := aws.StringInSlice(*listMetric.MetricName, dailyMetricNames); exists { - continue - } - - metricDataQuery := createMetricDataQuery(listMetric, period, i) - if metricDataQuery == metricDataQueryEmpty { - continue - } - metricDataQueries = append(metricDataQueries, metricDataQuery) - } - return metricDataQueries -} - -// CreateS3Events creates s3_request and s3_daily_storage events from Cloudwatch metric data. -func createS3RequestEvents(outputs []cloudwatch.MetricDataResult, regionName string, bucketName string, accountName string, accountID string) (event mb.Event, err error) { - // AWS s3_request metrics - mapOfMetricSetFieldResults := make(map[string]interface{}) - - // Find a timestamp for all metrics in output - timestamp := aws.FindTimestamp(outputs) - if !timestamp.IsZero() { - for _, output := range outputs { - if len(output.Values) == 0 { - continue - } - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - labels := strings.Split(*output.Label, " ") - if labels[0] == bucketName && len(output.Values) > timestampIdx { - mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[timestampIdx]) - } - } - } - } - - resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields) - if err != nil { - err = errors.Wrap(err, "Error trying to apply schema schemaMetricSetFields in AWS s3_request metricbeat module.") - return - } - - event = aws.InitEvent(regionName, accountName, accountID, timestamp) - event.MetricSetFields = resultMetricSetFields - event.RootFields.Put("aws.s3.bucket.name", bucketName) - return -} diff --git a/x-pack/metricbeat/module/aws/s3_request/s3_request_integration_test.go b/x-pack/metricbeat/module/aws/s3_request/s3_request_integration_test.go index eeae8439f1f..c7b37de2af3 100644 --- a/x-pack/metricbeat/module/aws/s3_request/s3_request_integration_test.go +++ b/x-pack/metricbeat/module/aws/s3_request/s3_request_integration_test.go @@ -17,7 +17,8 @@ import ( ) func TestFetch(t *testing.T) { - config := mtest.GetConfigForTest(t, "s3_request", "86400s") + t.Skip("flaky test: https://github.com/elastic/beats/issues/21826") + config := mtest.GetConfigForTest(t, "s3_request", "60s") metricSet := mbtest.NewReportingMetricSetV2Error(t, config) events, errs := mbtest.ReportingFetchV2Error(metricSet) @@ -28,36 +29,31 @@ func TestFetch(t *testing.T) { assert.NotEmpty(t, events) for _, event := range events { - // RootField - mtest.CheckEventField("service.name", "string", event, t) mtest.CheckEventField("cloud.region", "string", event, t) - - // MetricSetField - mtest.CheckEventField("bucket.name", "string", event, t) - mtest.CheckEventField("requests.total", "int", event, t) - mtest.CheckEventField("requests.get", "int", event, t) - mtest.CheckEventField("requests.put", "int", event, t) - mtest.CheckEventField("requests.delete", "int", event, t) - mtest.CheckEventField("requests.head", "int", event, t) - mtest.CheckEventField("requests.post", "int", event, t) - mtest.CheckEventField("select.requests", "int", event, t) - mtest.CheckEventField("select_scanned.bytes", "float", event, t) - mtest.CheckEventField("select_returned.bytes", "float", event, t) - mtest.CheckEventField("requests.list", "int", event, t) - mtest.CheckEventField("downloaded.bytes", "float", event, t) - mtest.CheckEventField("uploaded.bytes", "float", event, t) - mtest.CheckEventField("errors.4xx", "int", event, t) - mtest.CheckEventField("errors.5xx", "int", event, t) - mtest.CheckEventField("latency.first_byte.ms", "float", event, t) - mtest.CheckEventField("latency.total_request.ms", "float", event, t) + mtest.CheckEventField("aws.dimensions.BucketName", "string", event, t) + mtest.CheckEventField("aws.dimensions.StorageType", "string", event, t) + mtest.CheckEventField("s3.metrics.AllRequests.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.GetRequests.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.PutRequests.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.DeleteRequests.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.HeadRequests.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.PostRequests.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.SelectRequests.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.SelectScannedBytes.avg", "float", event, t) + mtest.CheckEventField("s3.metrics.SelectReturnedBytes.avg", "float", event, t) + mtest.CheckEventField("s3.metrics.ListRequests.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.BytesDownloaded.avg", "float", event, t) + mtest.CheckEventField("s3.metrics.BytesUploaded.avg", "float", event, t) + mtest.CheckEventField("s3.metrics.4xxErrors.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.5xxErrors.avg", "int", event, t) + mtest.CheckEventField("s3.metrics.FirstByteLatency.avg", "float", event, t) + mtest.CheckEventField("s3.metrics.TotalRequestLatency.avg", "float", event, t) } } func TestData(t *testing.T) { - config := mtest.GetConfigForTest(t, "s3_request", "86400s") + config := mtest.GetConfigForTest(t, "s3_request", "60s") - metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - if err := mbtest.WriteEventsReporterV2Error(metricSet, t, "/"); err != nil { - t.Fatal("write", err) - } + metricSet := mbtest.NewFetcher(t, config) + metricSet.WriteEvents(t, "/") } diff --git a/x-pack/metricbeat/module/aws/s3_request/s3_request_test.go b/x-pack/metricbeat/module/aws/s3_request/s3_request_test.go new file mode 100644 index 00000000000..41098399024 --- /dev/null +++ b/x-pack/metricbeat/module/aws/s3_request/s3_request_test.go @@ -0,0 +1,21 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package s3_request + +import ( + "os" + + "github.com/elastic/beats/v7/metricbeat/mb" + + // Register input module and metricset + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch" +) + +func init() { + // To be moved to some kind of helper + os.Setenv("BEAT_STRICT_PERMS", "false") + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) +}