Skip to content

Commit

Permalink
Collect Cloudwatch metrics from the same timestamp (#11142)
Browse files Browse the repository at this point in the history
* Update aws documentation
* Add checks for instance fields in TestFetch
* Add tests for CheckTimestampInArray and FindTimestamp
* Update comment for CheckTimestampInArray
* Change test to table driven
* Adopt CheckTimestampInArray and FindTimestamp in S3 and SQS
  • Loading branch information
kaiyan-sheng authored Mar 13, 2019
1 parent 8169159 commit 822b95a
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change ECS field cloud.provider to aws. {pull}11023[11023]
- Add documentation about jolokia autodiscover fields. {issue}10925[10925] {pull}10979[10979]
- Add missing aws.ec2.instance.state.name into fields.yml. {issue}11219[11219] {pull}11221[11221]
- Fix ec2 metricset to collect metrics from Cloudwatch with the same timestamp. {pull}11142[11142]

*Packetbeat*

Expand Down
6 changes: 4 additions & 2 deletions metricbeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te
aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600
----

Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new
Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading]
feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order
to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new
credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable
aws module making AWS api calls without have to generate new temporary credentials and update the config frequently.

Expand Down
6 changes: 4 additions & 2 deletions x-pack/metricbeat/module/aws/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te
aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600
----

Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new
Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading]
feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order
to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new
credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable
aws module making AWS api calls without have to generate new temporary credentials and update the config frequently.

Expand Down
25 changes: 18 additions & 7 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,22 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult,

// AWS EC2 Metrics
mapOfMetricSetFieldResults := make(map[string]interface{})
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(getMetricDataResults)
if !timestamp.IsZero() {
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
if len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
labels := strings.Split(*output.Label, " ")
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[0])
}

resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields)
Expand All @@ -181,8 +191,9 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult,
}

if len(mapOfMetricSetFieldResults) <= 11 {
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for a new instance during the " +
"first data collection. If this shows up multiple times, please recheck the period setting in config."
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for non-running instances or " +
"a new instance during the first data collection. If this shows up multiple times, please recheck the period " +
"setting in config."
}

instanceStateName, err := instanceOutput.State.Name.MarshalValue()
Expand Down
8 changes: 7 additions & 1 deletion x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ func TestFetch(t *testing.T) {
mtest.CheckEventField("service.name", "string", event, t)
mtest.CheckEventField("cloud.availability_zone", "string", event, t)
mtest.CheckEventField("cloud.provider", "string", event, t)
mtest.CheckEventField("cloud.image.id", "string", event, t)
mtest.CheckEventField("cloud.instance.id", "string", event, t)
mtest.CheckEventField("cloud.machine.type", "string", event, t)
mtest.CheckEventField("cloud.provider", "string", event, t)
mtest.CheckEventField("cloud.region", "string", event, t)
mtest.CheckEventField("instance.image.id", "string", event, t)
mtest.CheckEventField("instance.state.name", "string", event, t)
mtest.CheckEventField("instance.state.code", "int", event, t)
mtest.CheckEventField("instance.monitoring.state", "string", event, t)
mtest.CheckEventField("instance.core.count", "int", event, t)
mtest.CheckEventField("instance.threads_per_core", "int", event, t)

// MetricSetField
mtest.CheckEventField("cpu.total.pct", "float", event, t)
mtest.CheckEventField("cpu.credit_usage", "float", event, t)
Expand Down
30 changes: 18 additions & 12 deletions x-pack/metricbeat/module/aws/ec2/ec2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package ec2

import (
"testing"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
Expand Down Expand Up @@ -165,27 +166,32 @@ func TestCreateCloudWatchEvents(t *testing.T) {
assert.Equal(t, 1, len(instanceIDs))
instanceID := instanceIDs[0]
assert.Equal(t, instanceID, instanceID)
timestamp := time.Now()

getMetricDataOutput := []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
}

Expand Down
6 changes: 3 additions & 3 deletions x-pack/metricbeat/module/aws/mtest/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ func compareType(metricValue interface{}, expectedType string, metricName string
switch metricValue.(type) {
case float64:
if expectedType != "float" {
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType)
}
case string:
if expectedType != "string" {
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType)
}
case int64:
if expectedType != "int" {
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType)
}
}
return
Expand Down
18 changes: 12 additions & 6 deletions x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,20 @@ func createCloudWatchEvents(outputs []cloudwatch.MetricDataResult, regionName st

// AWS s3_daily_storage metrics
mapOfMetricSetFieldResults := make(map[string]interface{})

// Find a timestamp for all metrics in output
if len(outputs) > 0 && len(outputs[0].Timestamps) > 0 {
timestamp := outputs[0].Timestamps[0]
timestamp := aws.FindTimestamp(outputs)
if !timestamp.IsZero() {
for _, output := range outputs {
labels := strings.Split(*output.Label, " ")
// check timestamp to make sure metrics come from the same timestamp
if len(labels) == 3 && labels[0] == bucketName && len(output.Values) > 0 && output.Timestamps[0] == timestamp {
mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[0])
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
if labels[0] == bucketName && len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions x-pack/metricbeat/module/aws/s3_request/s3_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,20 @@ func createS3RequestEvents(outputs []cloudwatch.MetricDataResult, regionName str

// AWS s3_request metrics
mapOfMetricSetFieldResults := make(map[string]interface{})

// Find a timestamp for all metrics in output
if len(outputs) > 0 && len(outputs[0].Timestamps) > 0 {
timestamp := outputs[0].Timestamps[0]
timestamp := aws.FindTimestamp(outputs)
if !timestamp.IsZero() {
for _, output := range outputs {
labels := strings.Split(*output.Label, " ")
// check timestamp to make sure metrics come from the same timestamp
if len(labels) == 3 && labels[0] == bucketName && len(output.Values) > 0 && output.Timestamps[0] == timestamp {
mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[0])
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
if labels[0] == bucketName && len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
}
Expand Down
23 changes: 15 additions & 8 deletions x-pack/metricbeat/module/aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,22 @@ func createEventPerQueue(getMetricDataResults []cloudwatch.MetricDataResult, que

// AWS sqs metrics
mapOfMetricSetFieldResults := make(map[string]interface{})
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue
}
labels := strings.Split(*output.Label, " ")
if labels[0] != queueName {
continue

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(getMetricDataResults)
if !timestamp.IsZero() {
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
if labels[0] == queueName && len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[0])
}

resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricFields)
Expand Down
52 changes: 52 additions & 0 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,55 @@ func GetMetricDataResults(metricDataQueries []cloudwatch.MetricDataQuery, svc cl
func EventMapping(input map[string]interface{}, schema s.Schema) (common.MapStr, error) {
return schema.Apply(input, s.FailOnRequired)
}

// CheckTimestampInArray checks if input timestamp exists in timestampArray and if it exists, return the position.
func CheckTimestampInArray(timestamp time.Time, timestampArray []time.Time) (bool, int) {
for i := 0; i < len(timestampArray); i++ {
if timestamp.Equal(timestampArray[i]) {
return true, i
}
}
return false, -1
}

// FindTimestamp function checks MetricDataResults and find the timestamp to collect metrics from.
// For example, MetricDataResults might look like:
// metricDataResults = [{
// Id: "sqs0",
// Label: "testName SentMessageSize",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC],
// Values: [981]
// } {
// Id: "sqs1",
// Label: "testName NumberOfMessagesSent",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC,2019-03-11 17:40:00 +0000 UTC],
// Values: [0.5,0]
// }]
// This case, we are collecting values for both metrics from timestamp 2019-03-11 17:45:00 +0000 UTC.
func FindTimestamp(getMetricDataResults []cloudwatch.MetricDataResult) time.Time {
timestamp := time.Time{}
for _, output := range getMetricDataResults {
// When there are outputs with one timestamp, use this timestamp.
if output.Timestamps != nil && len(output.Timestamps) == 1 {
// Use the first timestamp from Timestamps field to collect the latest data.
timestamp = output.Timestamps[0]
return timestamp
}
}

// When there is no output with one timestamp, use the latest timestamp from timestamp list.
if timestamp.IsZero() {
for _, output := range getMetricDataResults {
// When there are outputs with one timestamp, use this timestamp
if output.Timestamps != nil && len(output.Timestamps) > 1 {
// Example Timestamps: [2019-03-11 17:36:00 +0000 UTC,2019-03-11 17:31:00 +0000 UTC]
timestamp = output.Timestamps[0]
return timestamp
}
}
}

return timestamp
}
Loading

0 comments on commit 822b95a

Please sign in to comment.