Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect Cloudwatch metrics from the same timestamp #11142

Merged
merged 10 commits into from
Mar 13, 2019
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change ECS field cloud.provider to aws. {pull}11023[11023]
- Add documentation about jolokia autodiscover fields. {issue}10925[10925] {pull}10979[10979]
- Add missing aws.ec2.instance.state.name into fields.yml. {issue}11219[11219] {pull}11221[11221]
- Fix ec2 metricset to collect metrics from Cloudwatch with the same timestamp. {pull}11142[11142]

*Packetbeat*

Expand Down
6 changes: 4 additions & 2 deletions metricbeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te
aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600
----

Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new
Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading]
feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order
to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new
credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable
aws module making AWS api calls without have to generate new temporary credentials and update the config frequently.

Expand Down
6 changes: 4 additions & 2 deletions x-pack/metricbeat/module/aws/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ see https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html[Te
aws> sts get-session-token --serial-number arn:aws:iam::1234:mfa/your-email@example.com --token-code 456789 --duration-seconds 129600
----

Since temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. This will cause data loss if the config file is not update with new
Because temporary security credentials are short term, after they expire, the user needs to generate new ones and modify
the aws.yml config file with the new credentials. Unless https://www.elastic.co/guide/en/beats/metricbeat/current/_live_reloading.html[live reloading]
feature is enabled for Metricbeat, the user needs to manually restart Metricbeat after updating the config file in order
to continue collecting Cloudwatch metrics. This will cause data loss if the config file is not updated with new
credentials before the old ones expire. For Metricbeat, we recommend users to use access keys in config file to enable
aws module making AWS api calls without have to generate new temporary credentials and update the config frequently.

Expand Down
25 changes: 18 additions & 7 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,22 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult,

// AWS EC2 Metrics
mapOfMetricSetFieldResults := make(map[string]interface{})
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(getMetricDataResults)
if !timestamp.IsZero() {
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
for _, output := range getMetricDataResults {
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
if len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
labels := strings.Split(*output.Label, " ")
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[0])
}

resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields)
Expand All @@ -181,8 +191,9 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult,
}

if len(mapOfMetricSetFieldResults) <= 11 {
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for a new instance during the " +
"first data collection. If this shows up multiple times, please recheck the period setting in config."
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for non-running instances or " +
"a new instance during the first data collection. If this shows up multiple times, please recheck the period " +
"setting in config."
}

instanceStateName, err := instanceOutput.State.Name.MarshalValue()
Expand Down
8 changes: 7 additions & 1 deletion x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ func TestFetch(t *testing.T) {
mtest.CheckEventField("service.name", "string", event, t)
mtest.CheckEventField("cloud.availability_zone", "string", event, t)
mtest.CheckEventField("cloud.provider", "string", event, t)
mtest.CheckEventField("cloud.image.id", "string", event, t)
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
mtest.CheckEventField("cloud.instance.id", "string", event, t)
mtest.CheckEventField("cloud.machine.type", "string", event, t)
mtest.CheckEventField("cloud.provider", "string", event, t)
mtest.CheckEventField("cloud.region", "string", event, t)
mtest.CheckEventField("instance.image.id", "string", event, t)
mtest.CheckEventField("instance.state.name", "string", event, t)
mtest.CheckEventField("instance.state.code", "int", event, t)
mtest.CheckEventField("instance.monitoring.state", "string", event, t)
mtest.CheckEventField("instance.core.count", "int", event, t)
mtest.CheckEventField("instance.threads_per_core", "int", event, t)

// MetricSetField
mtest.CheckEventField("cpu.total.pct", "float", event, t)
mtest.CheckEventField("cpu.credit_usage", "float", event, t)
Expand Down
30 changes: 18 additions & 12 deletions x-pack/metricbeat/module/aws/ec2/ec2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package ec2

import (
"testing"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
Expand Down Expand Up @@ -165,27 +166,32 @@ func TestCreateCloudWatchEvents(t *testing.T) {
assert.Equal(t, 1, len(instanceIDs))
instanceID := instanceIDs[0]
assert.Equal(t, instanceID, instanceID)
timestamp := time.Now()

getMetricDataOutput := []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
}

Expand Down
6 changes: 3 additions & 3 deletions x-pack/metricbeat/module/aws/mtest/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ func compareType(metricValue interface{}, expectedType string, metricName string
switch metricValue.(type) {
case float64:
if expectedType != "float" {
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType)
}
case string:
if expectedType != "string" {
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType)
}
case int64:
if expectedType != "int" {
err = errors.New("Failed: Field " + metricName + "is not in type " + expectedType)
err = errors.New("Failed: Field " + metricName + " is not in type " + expectedType)
}
}
return
Expand Down
18 changes: 12 additions & 6 deletions x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,20 @@ func createCloudWatchEvents(outputs []cloudwatch.MetricDataResult, regionName st

// AWS s3_daily_storage metrics
mapOfMetricSetFieldResults := make(map[string]interface{})

// Find a timestamp for all metrics in output
if len(outputs) > 0 && len(outputs[0].Timestamps) > 0 {
timestamp := outputs[0].Timestamps[0]
timestamp := aws.FindTimestamp(outputs)
if !timestamp.IsZero() {
for _, output := range outputs {
labels := strings.Split(*output.Label, " ")
// check timestamp to make sure metrics come from the same timestamp
if len(labels) == 3 && labels[0] == bucketName && len(output.Values) > 0 && output.Timestamps[0] == timestamp {
mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[0])
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
if labels[0] == bucketName && len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions x-pack/metricbeat/module/aws/s3_request/s3_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,20 @@ func createS3RequestEvents(outputs []cloudwatch.MetricDataResult, regionName str

// AWS s3_request metrics
mapOfMetricSetFieldResults := make(map[string]interface{})

// Find a timestamp for all metrics in output
if len(outputs) > 0 && len(outputs[0].Timestamps) > 0 {
timestamp := outputs[0].Timestamps[0]
timestamp := aws.FindTimestamp(outputs)
if !timestamp.IsZero() {
for _, output := range outputs {
labels := strings.Split(*output.Label, " ")
// check timestamp to make sure metrics come from the same timestamp
if len(labels) == 3 && labels[0] == bucketName && len(output.Values) > 0 && output.Timestamps[0] == timestamp {
mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[0])
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this code is identical to the code in the s3 daily storage file? Perhaps later we can take it out.

if exists {
labels := strings.Split(*output.Label, " ")
if labels[0] == bucketName && len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[2]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
}
Expand Down
23 changes: 15 additions & 8 deletions x-pack/metricbeat/module/aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,22 @@ func createEventPerQueue(getMetricDataResults []cloudwatch.MetricDataResult, que

// AWS sqs metrics
mapOfMetricSetFieldResults := make(map[string]interface{})
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue
}
labels := strings.Split(*output.Label, " ")
if labels[0] != queueName {
continue

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(getMetricDataResults)
if !timestamp.IsZero() {
for _, output := range getMetricDataResults {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more time the same code? :-)

if exists {
labels := strings.Split(*output.Label, " ")
if labels[0] == queueName && len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[0])
}

resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricFields)
Expand Down
52 changes: 52 additions & 0 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,55 @@ func GetMetricDataResults(metricDataQueries []cloudwatch.MetricDataQuery, svc cl
func EventMapping(input map[string]interface{}, schema s.Schema) (common.MapStr, error) {
return schema.Apply(input, s.FailOnRequired)
}

// CheckTimestampInArray checks if input timestamp exists in timestampArray and if it exists, return the position.
func CheckTimestampInArray(timestamp time.Time, timestampArray []time.Time) (bool, int) {
for i := 0; i < len(timestampArray); i++ {
if timestamp.Equal(timestampArray[i]) {
return true, i
}
}
return false, -1
}

// FindTimestamp function checks MetricDataResults and find the timestamp to collect metrics from.
// For example, MetricDataResults might look like:
// metricDataResults = [{
// Id: "sqs0",
// Label: "testName SentMessageSize",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC],
// Values: [981]
// } {
// Id: "sqs1",
// Label: "testName NumberOfMessagesSent",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC,2019-03-11 17:40:00 +0000 UTC],
// Values: [0.5,0]
// }]
// This case, we are collecting values for both metrics from timestamp 2019-03-11 17:45:00 +0000 UTC.
func FindTimestamp(getMetricDataResults []cloudwatch.MetricDataResult) time.Time {
timestamp := time.Time{}
for _, output := range getMetricDataResults {
// When there are outputs with one timestamp, use this timestamp.
if output.Timestamps != nil && len(output.Timestamps) == 1 {
// Use the first timestamp from Timestamps field to collect the latest data.
timestamp = output.Timestamps[0]
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
return timestamp
}
}

// When there is no output with one timestamp, use the latest timestamp from timestamp list.
if timestamp.IsZero() {
for _, output := range getMetricDataResults {
// When there are outputs with one timestamp, use this timestamp
if output.Timestamps != nil && len(output.Timestamps) > 1 {
// Example Timestamps: [2019-03-11 17:36:00 +0000 UTC,2019-03-11 17:31:00 +0000 UTC]
timestamp = output.Timestamps[0]
return timestamp
}
}
}

return timestamp
}
Loading