From 822b95a02eb4dde08075040210d6ef2013ec5ac6 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 13 Mar 2019 14:46:23 -0600 Subject: [PATCH] Collect Cloudwatch metrics from the same timestamp (#11142) * Update aws documentation * Add checks for instance fields in TestFetch * Add tests for CheckTimestampInArray and FindTimestamp * Update comment for CheckTimestampInArray * Change test to table driven * Adopt CheckTimestampInArray and FindTimestamp in S3 and SQS --- CHANGELOG.next.asciidoc | 1 + metricbeat/docs/modules/aws.asciidoc | 6 +- .../metricbeat/module/aws/_meta/docs.asciidoc | 6 +- x-pack/metricbeat/module/aws/ec2/ec2.go | 25 +++-- .../module/aws/ec2/ec2_integration_test.go | 8 +- x-pack/metricbeat/module/aws/ec2/ec2_test.go | 30 +++-- .../module/aws/mtest/integration.go | 6 +- .../aws/s3_daily_storage/s3_daily_storage.go | 18 ++- .../module/aws/s3_request/s3_request.go | 18 ++- x-pack/metricbeat/module/aws/sqs/sqs.go | 23 ++-- x-pack/metricbeat/module/aws/utils.go | 52 +++++++++ x-pack/metricbeat/module/aws/utils_test.go | 106 ++++++++++++++++++ 12 files changed, 252 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 981574591628..2a0e98b4b4a0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -197,6 +197,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Change ECS field cloud.provider to aws. {pull}11023[11023] - Add documentation about jolokia autodiscover fields. {issue}10925[10925] {pull}10979[10979] - Add missing aws.ec2.instance.state.name into fields.yml. {issue}11219[11219] {pull}11221[11221] +- Fix ec2 metricset to collect metrics from Cloudwatch with the same timestamp. {pull}11142[11142] *Packetbeat* diff --git a/metricbeat/docs/modules/aws.asciidoc b/metricbeat/docs/modules/aws.asciidoc index 6718eada7ed9..ea919d6ee6a5 100644 --- a/metricbeat/docs/modules/aws.asciidoc +++ b/metricbeat/docs/modules/aws.asciidoc @@ -31,8 +31,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600 ---- -Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify -the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new +Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify +the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading] +feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order +to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable aws module making AWS api calls without have to generate new temporary credentials and update the config frequently. diff --git a/x-pack/metricbeat/module/aws/_meta/docs.asciidoc b/x-pack/metricbeat/module/aws/_meta/docs.asciidoc index 42e6da16527e..d4a39dc7cf4a 100644 --- a/x-pack/metricbeat/module/aws/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/aws/_meta/docs.asciidoc @@ -24,8 +24,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600 ---- -Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify -the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new +Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify +the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading] +feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order +to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable aws module making AWS api calls without have to generate new temporary credentials and update the config frequently. diff --git a/x-pack/metricbeat/module/aws/ec2/ec2.go b/x-pack/metricbeat/module/aws/ec2/ec2.go index 9033195516c8..1716cd87e1c9 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2.go @@ -166,12 +166,22 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, // AWS EC2 Metrics mapOfMetricSetFieldResults := make(map[string]interface{}) - for _, output := range getMetricDataResults { - if len(output.Values) == 0 { - continue + + // Find a timestamp for all metrics in output + timestamp := aws.FindTimestamp(getMetricDataResults) + if !timestamp.IsZero() { + for _, output := range getMetricDataResults { + if len(output.Values) == 0 { + continue + } + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, " ") + if len(output.Values) > timestampIdx { + mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx]) + } + } } - labels := strings.Split(*output.Label, " ") - mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[0]) } resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields) @@ -181,8 +191,9 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, } if len(mapOfMetricSetFieldResults) <= 11 { - info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for a new instance during the " + - "first data collection. If this shows up multiple times, please recheck the period setting in config." + info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for non-running instances or " + + "a new instance during the first data collection. If this shows up multiple times, please recheck the period " + + "setting in config." } instanceStateName, err := instanceOutput.State.Name.MarshalValue() diff --git a/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go b/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go index d327c539af90..6261a256c91e 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go @@ -39,11 +39,17 @@ func TestFetch(t *testing.T) { mtest.CheckEventField("service.name", "string", event, t) mtest.CheckEventField("cloud.availability_zone", "string", event, t) mtest.CheckEventField("cloud.provider", "string", event, t) - mtest.CheckEventField("cloud.image.id", "string", event, t) mtest.CheckEventField("cloud.instance.id", "string", event, t) mtest.CheckEventField("cloud.machine.type", "string", event, t) mtest.CheckEventField("cloud.provider", "string", event, t) mtest.CheckEventField("cloud.region", "string", event, t) + mtest.CheckEventField("instance.image.id", "string", event, t) + mtest.CheckEventField("instance.state.name", "string", event, t) + mtest.CheckEventField("instance.state.code", "int", event, t) + mtest.CheckEventField("instance.monitoring.state", "string", event, t) + mtest.CheckEventField("instance.core.count", "int", event, t) + mtest.CheckEventField("instance.threads_per_core", "int", event, t) + // MetricSetField mtest.CheckEventField("cpu.total.pct", "float", event, t) mtest.CheckEventField("cpu.credit_usage", "float", event, t) diff --git a/x-pack/metricbeat/module/aws/ec2/ec2_test.go b/x-pack/metricbeat/module/aws/ec2/ec2_test.go index b06d4fc386d9..3e440d3920fb 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2_test.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2_test.go @@ -8,6 +8,7 @@ package ec2 import ( "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" @@ -165,27 +166,32 @@ func TestCreateCloudWatchEvents(t *testing.T) { assert.Equal(t, 1, len(instanceIDs)) instanceID := instanceIDs[0] assert.Equal(t, instanceID, instanceID) + timestamp := time.Now() getMetricDataOutput := []cloudwatch.MetricDataResult{ { - Id: &id1, - Label: &label1, - Values: []float64{0.25}, + Id: &id1, + Label: &label1, + Values: []float64{0.25}, + Timestamps: []time.Time{timestamp}, }, { - Id: &id2, - Label: &label2, - Values: []float64{0.0}, + Id: &id2, + Label: &label2, + Values: []float64{0.0}, + Timestamps: []time.Time{timestamp}, }, { - Id: &id3, - Label: &label3, - Values: []float64{0.0}, + Id: &id3, + Label: &label3, + Values: []float64{0.0}, + Timestamps: []time.Time{timestamp}, }, { - Id: &id4, - Label: &label4, - Values: []float64{0.0}, + Id: &id4, + Label: &label4, + Values: []float64{0.0}, + Timestamps: []time.Time{timestamp}, }, } diff --git a/x-pack/metricbeat/module/aws/mtest/integration.go b/x-pack/metricbeat/module/aws/mtest/integration.go index afacfdcbb0d3..aabc2f5d0a0b 100644 --- a/x-pack/metricbeat/module/aws/mtest/integration.go +++ b/x-pack/metricbeat/module/aws/mtest/integration.go @@ -76,15 +76,15 @@ func compareType(metricValue interface{}, expectedType string, metricName string switch metricValue.(type) { case float64: if expectedType != "float" { - err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType) + err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType) } case string: if expectedType != "string" { - err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType) + err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType) } case int64: if expectedType != "int" { - err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType) + err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType) } } return diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go index 7110018bbf04..cfee884c668a 100644 --- a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go +++ b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go @@ -204,14 +204,20 @@ func createCloudWatchEvents(outputs []cloudwatch.MetricDataResult, regionName st // AWS s3_daily_storage metrics mapOfMetricSetFieldResults := make(map[string]interface{}) + // Find a timestamp for all metrics in output - if len(outputs) > 0 && len(outputs[0].Timestamps) > 0 { - timestamp := outputs[0].Timestamps[0] + timestamp := aws.FindTimestamp(outputs) + if !timestamp.IsZero() { for _, output := range outputs { - labels := strings.Split(*output.Label, " ") - // check timestamp to make sure metrics come from the same timestamp - if len(labels) == 3 && labels[0] == bucketName && len(output.Values) > 0 && output.Timestamps[0] == timestamp { - mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[0]) + if len(output.Values) == 0 { + continue + } + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, " ") + if labels[0] == bucketName && len(output.Values) > timestampIdx { + mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[timestampIdx]) + } } } } diff --git a/x-pack/metricbeat/module/aws/s3_request/s3_request.go b/x-pack/metricbeat/module/aws/s3_request/s3_request.go index 67f2cd32425c..5e759930a89a 100644 --- a/x-pack/metricbeat/module/aws/s3_request/s3_request.go +++ b/x-pack/metricbeat/module/aws/s3_request/s3_request.go @@ -205,14 +205,20 @@ func createS3RequestEvents(outputs []cloudwatch.MetricDataResult, regionName str // AWS s3_request metrics mapOfMetricSetFieldResults := make(map[string]interface{}) + // Find a timestamp for all metrics in output - if len(outputs) > 0 && len(outputs[0].Timestamps) > 0 { - timestamp := outputs[0].Timestamps[0] + timestamp := aws.FindTimestamp(outputs) + if !timestamp.IsZero() { for _, output := range outputs { - labels := strings.Split(*output.Label, " ") - // check timestamp to make sure metrics come from the same timestamp - if len(labels) == 3 && labels[0] == bucketName && len(output.Values) > 0 && output.Timestamps[0] == timestamp { - mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[0]) + if len(output.Values) == 0 { + continue + } + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, " ") + if labels[0] == bucketName && len(output.Values) > timestampIdx { + mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[timestampIdx]) + } } } } diff --git a/x-pack/metricbeat/module/aws/sqs/sqs.go b/x-pack/metricbeat/module/aws/sqs/sqs.go index 49f7f280750f..ab34379b0cbe 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs.go @@ -180,15 +180,22 @@ func createEventPerQueue(getMetricDataResults []cloudwatch.MetricDataResult, que // AWS sqs metrics mapOfMetricSetFieldResults := make(map[string]interface{}) - for _, output := range getMetricDataResults { - if len(output.Values) == 0 { - continue - } - labels := strings.Split(*output.Label, " ") - if labels[0] != queueName { - continue + + // Find a timestamp for all metrics in output + timestamp := aws.FindTimestamp(getMetricDataResults) + if !timestamp.IsZero() { + for _, output := range getMetricDataResults { + if len(output.Values) == 0 { + continue + } + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, " ") + if labels[0] == queueName && len(output.Values) > timestampIdx { + mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx]) + } + } } - mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[0]) } resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricFields) diff --git a/x-pack/metricbeat/module/aws/utils.go b/x-pack/metricbeat/module/aws/utils.go index e66ad82d6057..2249f82e2266 100644 --- a/x-pack/metricbeat/module/aws/utils.go +++ b/x-pack/metricbeat/module/aws/utils.go @@ -84,3 +84,55 @@ func GetMetricDataResults(metricDataQueries []cloudwatch.MetricDataQuery, svc cl func EventMapping(input map[string]interface{}, schema s.Schema) (common.MapStr, error) { return schema.Apply(input, s.FailOnRequired) } + +// CheckTimestampInArray checks if input timestamp exists in timestampArray and if it exists, return the position. +func CheckTimestampInArray(timestamp time.Time, timestampArray []time.Time) (bool, int) { + for i := 0; i < len(timestampArray); i++ { + if timestamp.Equal(timestampArray[i]) { + return true, i + } + } + return false, -1 +} + +// FindTimestamp function checks MetricDataResults and find the timestamp to collect metrics from. +// For example, MetricDataResults might look like: +// metricDataResults = [{ +// Id: "sqs0", +// Label: "testName SentMessageSize", +// StatusCode: Complete, +// Timestamps: [2019-03-11 17:45:00 +0000 UTC], +// Values: [981] +// } { +// Id: "sqs1", +// Label: "testName NumberOfMessagesSent", +// StatusCode: Complete, +// Timestamps: [2019-03-11 17:45:00 +0000 UTC,2019-03-11 17:40:00 +0000 UTC], +// Values: [0.5,0] +// }] +// This case, we are collecting values for both metrics from timestamp 2019-03-11 17:45:00 +0000 UTC. +func FindTimestamp(getMetricDataResults []cloudwatch.MetricDataResult) time.Time { + timestamp := time.Time{} + for _, output := range getMetricDataResults { + // When there are outputs with one timestamp, use this timestamp. + if output.Timestamps != nil && len(output.Timestamps) == 1 { + // Use the first timestamp from Timestamps field to collect the latest data. + timestamp = output.Timestamps[0] + return timestamp + } + } + + // When there is no output with one timestamp, use the latest timestamp from timestamp list. + if timestamp.IsZero() { + for _, output := range getMetricDataResults { + // When there are outputs with one timestamp, use this timestamp + if output.Timestamps != nil && len(output.Timestamps) > 1 { + // Example Timestamps: [2019-03-11 17:36:00 +0000 UTC,2019-03-11 17:31:00 +0000 UTC] + timestamp = output.Timestamps[0] + return timestamp + } + } + } + + return timestamp +} diff --git a/x-pack/metricbeat/module/aws/utils_test.go b/x-pack/metricbeat/module/aws/utils_test.go index 9b57ecd3e67e..00bab25d80c2 100644 --- a/x-pack/metricbeat/module/aws/utils_test.go +++ b/x-pack/metricbeat/module/aws/utils_test.go @@ -7,6 +7,7 @@ package aws import ( "fmt" "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" @@ -169,3 +170,108 @@ func TestGetMetricDataResults(t *testing.T) { assert.Equal(t, label4, *getMetricDataResults[3].Label) assert.Equal(t, 0.0, getMetricDataResults[3].Values[0]) } + +func TestCheckTimestampInArray(t *testing.T) { + timestamp1 := time.Now() + timestamp2 := timestamp1.Add(5 * time.Minute) + timestamp3 := timestamp1.Add(10 * time.Minute) + + cases := []struct { + targetTimestamp time.Time + expectedExists bool + expectedIndex int + }{ + { + targetTimestamp: timestamp1, + expectedExists: true, + expectedIndex: 0, + }, + { + targetTimestamp: timestamp3, + expectedExists: false, + expectedIndex: -1, + }, + } + + timestampArray := []time.Time{timestamp1, timestamp2} + for _, c := range cases { + exists, index := CheckTimestampInArray(c.targetTimestamp, timestampArray) + assert.Equal(t, c.expectedExists, exists) + assert.Equal(t, c.expectedIndex, index) + } +} + +func TestFindTimestamp(t *testing.T) { + timestamp1 := time.Now() + timestamp2 := timestamp1.Add(5 * time.Minute) + cases := []struct { + getMetricDataResults []cloudwatch.MetricDataResult + expectedTimestamp time.Time + }{ + { + getMetricDataResults: []cloudwatch.MetricDataResult{ + { + Id: &id1, + Label: &label1, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp1, timestamp2}, + Values: []float64{0, 1}, + }, + { + Id: &id2, + Label: &label2, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp1}, + Values: []float64{2, 3}, + }, + }, + expectedTimestamp: timestamp1, + }, + { + getMetricDataResults: []cloudwatch.MetricDataResult{ + { + Id: &id1, + Label: &label1, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp1, timestamp2}, + Values: []float64{0, 1}, + }, + { + Id: &id2, + Label: &label2, + StatusCode: cloudwatch.StatusCodeComplete, + }, + }, + expectedTimestamp: timestamp1, + }, + { + getMetricDataResults: []cloudwatch.MetricDataResult{ + { + Id: &id1, + Label: &label1, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp1, timestamp2}, + Values: []float64{0, 1}, + }, + { + Id: &id2, + Label: &label2, + StatusCode: cloudwatch.StatusCodeComplete, + }, + { + Id: &id3, + Label: &label2, + StatusCode: cloudwatch.StatusCodeComplete, + Timestamps: []time.Time{timestamp2}, + Values: []float64{2, 3}, + }, + }, + expectedTimestamp: timestamp2, + }, + } + + for _, c := range cases { + outputTimestamp := FindTimestamp(c.getMetricDataResults) + assert.Equal(t, c.expectedTimestamp, outputTimestamp) + } +}