Skip to content

Commit

Permalink
Cherry-pick to 7.0: Collect Cloudwatch metrics from the same timestamp (
Browse files Browse the repository at this point in the history
#11142) (#11236)

* Collect Cloudwatch metrics from the same timestamp (#11142)

* Update aws documentation
* Add checks for instance fields in TestFetch
* Add tests for CheckTimestampInArray and FindTimestamp
* Update comment for CheckTimestampInArray
* Change test to table driven
* Adopt CheckTimestampInArray and FindTimestamp in S3 and SQS

(cherry picked from commit 822b95a)

* Backport changes for CheckEventField and compareType from e03f966
  • Loading branch information
kaiyan-sheng authored Mar 15, 2019
1 parent dc3b6da commit bbb0b89
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di
- Fix parsing error using GET in Jolokia module. {pull}11075[11075] {issue}11071[11071]
- Add documentation about jolokia autodiscover fields. {issue}10925[10925] {pull}10979[10979]
- Add missing aws.ec2.instance.state.name into fields.yml. {issue}11219[11219] {pull}11221[11221]
- Fix ec2 metricset to collect metrics from Cloudwatch with the same timestamp. {pull}11142[11142]

*Packetbeat*

Expand Down
6 changes: 4 additions & 2 deletions metricbeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te
aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600
----

Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new
Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading]
feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order
to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new
credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable
aws module making AWS api calls without have to generate new temporary credentials and update the config frequently.

Expand Down
6 changes: 4 additions & 2 deletions x-pack/metricbeat/module/aws/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te
aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600
----

Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new
Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading]
feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order
to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new
credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable
aws module making AWS api calls without have to generate new temporary credentials and update the config frequently.

Expand Down
25 changes: 18 additions & 7 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,22 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult,

// AWS EC2 Metrics
mapOfMetricSetFieldResults := make(map[string]interface{})
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(getMetricDataResults)
if !timestamp.IsZero() {
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
if len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
labels := strings.Split(*output.Label, " ")
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[0])
}

resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields)
Expand All @@ -181,8 +191,9 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult,
}

if len(mapOfMetricSetFieldResults) <= 11 {
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for a new instance during the " +
"first data collection. If this shows up multiple times, please recheck the period setting in config."
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for non-running instances or " +
"a new instance during the first data collection. If this shows up multiple times, please recheck the period " +
"setting in config."
}

instanceStateName, err := instanceOutput.State.Name.MarshalValue()
Expand Down
8 changes: 7 additions & 1 deletion x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,17 @@ func TestFetch(t *testing.T) {
mtest.CheckEventField("service.name", "string", event, t)
mtest.CheckEventField("cloud.availability_zone", "string", event, t)
mtest.CheckEventField("cloud.provider", "string", event, t)
mtest.CheckEventField("cloud.image.id", "string", event, t)
mtest.CheckEventField("cloud.instance.id", "string", event, t)
mtest.CheckEventField("cloud.machine.type", "string", event, t)
mtest.CheckEventField("cloud.provider", "string", event, t)
mtest.CheckEventField("cloud.region", "string", event, t)
mtest.CheckEventField("instance.image.id", "string", event, t)
mtest.CheckEventField("instance.state.name", "string", event, t)
mtest.CheckEventField("instance.state.code", "int", event, t)
mtest.CheckEventField("instance.monitoring.state", "string", event, t)
mtest.CheckEventField("instance.core.count", "int", event, t)
mtest.CheckEventField("instance.threads_per_core", "int", event, t)

// MetricSetField
mtest.CheckEventField("cpu.total.pct", "float", event, t)
mtest.CheckEventField("cpu.credit_usage", "float", event, t)
Expand Down
30 changes: 18 additions & 12 deletions x-pack/metricbeat/module/aws/ec2/ec2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package ec2

import (
"testing"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
Expand Down Expand Up @@ -171,27 +172,32 @@ func TestCreateCloudWatchEvents(t *testing.T) {
assert.Equal(t, 1, len(instanceIDs))
instanceID := instanceIDs[0]
assert.Equal(t, instanceID, instanceID)
timestamp := time.Now()

getMetricDataOutput := []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
}

Expand Down
44 changes: 26 additions & 18 deletions x-pack/metricbeat/module/aws/mtest/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package mtest

import (
"errors"
"os"
"testing"

Expand Down Expand Up @@ -48,36 +49,43 @@ func GetConfigForTest(metricSetName string) (map[string]interface{}, string) {

// CheckEventField function checks a given field type and compares it with the expected type for integration tests.
func CheckEventField(metricName string, expectedType string, event mb.Event, t *testing.T) {
if ok, err := event.MetricSetFields.HasKey(metricName); ok {
assert.NoError(t, err)
metricValue, err := event.MetricSetFields.GetValue(metricName)
assert.NoError(t, err)
compareType(metricValue, expectedType, t)
} else if ok, err := event.RootFields.HasKey(metricName); ok {
assert.NoError(t, err)
rootValue, err := event.RootFields.GetValue(metricName)
assert.NoError(t, err)
compareType(rootValue, expectedType, t)
ok1, err1 := event.MetricSetFields.HasKey(metricName)
ok2, err2 := event.RootFields.HasKey(metricName)
if ok1 || ok2 {
if ok1 {
assert.NoError(t, err1)
metricValue, err := event.MetricSetFields.GetValue(metricName)
assert.NoError(t, err)
err = compareType(metricValue, expectedType, metricName)
assert.NoError(t, err)
t.Log("Succeed: Field " + metricName + " matches type " + expectedType)
} else if ok2 {
assert.NoError(t, err2)
rootValue, err := event.RootFields.GetValue(metricName)
assert.NoError(t, err)
err = compareType(rootValue, expectedType, metricName)
assert.NoError(t, err)
t.Log("Succeed: Field " + metricName + " matches type " + expectedType)
}
} else {
t.Log("Field " + metricName + " does not exist in metric set fields")
}
}

func compareType(metricValue interface{}, expectedType string, t *testing.T) {
func compareType(metricValue interface{}, expectedType string, metricName string) (err error) {
switch metricValue.(type) {
case float64:
if expectedType != "float" {
t.Log("Failed: Field is not in type " + expectedType)
t.Fail()
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
}
case string:
if expectedType != "string" {
t.Log("Failed: Field is not in type " + expectedType)
t.Fail()
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
}
case int64:
if expectedType != "int" {
t.Log("Failed: Field is not in type " + expectedType)
t.Fail()
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
}
}
t.Log("Succeed: Field matches type " + expectedType)
return
}
52 changes: 52 additions & 0 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,55 @@ func GetMetricDataResults(metricDataQueries []cloudwatch.MetricDataQuery, svc cl
func EventMapping(input map[string]interface{}, schema s.Schema) (common.MapStr, error) {
return schema.Apply(input, s.FailOnRequired)
}

// CheckTimestampInArray checks if input timestamp exists in timestampArray and if it exists, return the position.
func CheckTimestampInArray(timestamp time.Time, timestampArray []time.Time) (bool, int) {
for i := 0; i < len(timestampArray); i++ {
if timestamp.Equal(timestampArray[i]) {
return true, i
}
}
return false, -1
}

// FindTimestamp function checks MetricDataResults and find the timestamp to collect metrics from.
// For example, MetricDataResults might look like:
// metricDataResults = [{
// Id: "sqs0",
// Label: "testName SentMessageSize",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC],
// Values: [981]
// } {
// Id: "sqs1",
// Label: "testName NumberOfMessagesSent",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC,2019-03-11 17:40:00 +0000 UTC],
// Values: [0.5,0]
// }]
// This case, we are collecting values for both metrics from timestamp 2019-03-11 17:45:00 +0000 UTC.
func FindTimestamp(getMetricDataResults []cloudwatch.MetricDataResult) time.Time {
timestamp := time.Time{}
for _, output := range getMetricDataResults {
// When there are outputs with one timestamp, use this timestamp.
if output.Timestamps != nil && len(output.Timestamps) == 1 {
// Use the first timestamp from Timestamps field to collect the latest data.
timestamp = output.Timestamps[0]
return timestamp
}
}

// When there is no output with one timestamp, use the latest timestamp from timestamp list.
if timestamp.IsZero() {
for _, output := range getMetricDataResults {
// When there are outputs with one timestamp, use this timestamp
if output.Timestamps != nil && len(output.Timestamps) > 1 {
// Example Timestamps: [2019-03-11 17:36:00 +0000 UTC,2019-03-11 17:31:00 +0000 UTC]
timestamp = output.Timestamps[0]
return timestamp
}
}
}

return timestamp
}
106 changes: 106 additions & 0 deletions x-pack/metricbeat/module/aws/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package aws
import (
"fmt"
"testing"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
Expand Down Expand Up @@ -169,3 +170,108 @@ func TestGetMetricDataResults(t *testing.T) {
assert.Equal(t, label4, *getMetricDataResults[3].Label)
assert.Equal(t, 0.0, getMetricDataResults[3].Values[0])
}

func TestCheckTimestampInArray(t *testing.T) {
timestamp1 := time.Now()
timestamp2 := timestamp1.Add(5 * time.Minute)
timestamp3 := timestamp1.Add(10 * time.Minute)

cases := []struct {
targetTimestamp time.Time
expectedExists bool
expectedIndex int
}{
{
targetTimestamp: timestamp1,
expectedExists: true,
expectedIndex: 0,
},
{
targetTimestamp: timestamp3,
expectedExists: false,
expectedIndex: -1,
},
}

timestampArray := []time.Time{timestamp1, timestamp2}
for _, c := range cases {
exists, index := CheckTimestampInArray(c.targetTimestamp, timestampArray)
assert.Equal(t, c.expectedExists, exists)
assert.Equal(t, c.expectedIndex, index)
}
}

func TestFindTimestamp(t *testing.T) {
timestamp1 := time.Now()
timestamp2 := timestamp1.Add(5 * time.Minute)
cases := []struct {
getMetricDataResults []cloudwatch.MetricDataResult
expectedTimestamp time.Time
}{
{
getMetricDataResults: []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
StatusCode: cloudwatch.StatusCodeComplete,
Timestamps: []time.Time{timestamp1, timestamp2},
Values: []float64{0, 1},
},
{
Id: &id2,
Label: &label2,
StatusCode: cloudwatch.StatusCodeComplete,
Timestamps: []time.Time{timestamp1},
Values: []float64{2, 3},
},
},
expectedTimestamp: timestamp1,
},
{
getMetricDataResults: []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
StatusCode: cloudwatch.StatusCodeComplete,
Timestamps: []time.Time{timestamp1, timestamp2},
Values: []float64{0, 1},
},
{
Id: &id2,
Label: &label2,
StatusCode: cloudwatch.StatusCodeComplete,
},
},
expectedTimestamp: timestamp1,
},
{
getMetricDataResults: []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
StatusCode: cloudwatch.StatusCodeComplete,
Timestamps: []time.Time{timestamp1, timestamp2},
Values: []float64{0, 1},
},
{
Id: &id2,
Label: &label2,
StatusCode: cloudwatch.StatusCodeComplete,
},
{
Id: &id3,
Label: &label2,
StatusCode: cloudwatch.StatusCodeComplete,
Timestamps: []time.Time{timestamp2},
Values: []float64{2, 3},
},
},
expectedTimestamp: timestamp2,
},
}

for _, c := range cases {
outputTimestamp := FindTimestamp(c.getMetricDataResults)
assert.Equal(t, c.expectedTimestamp, outputTimestamp)
}
}

0 comments on commit bbb0b89

Please sign in to comment.