Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] Fix cloudwatch metricset missing tags collection #17424

Merged
merged 8 commits into from
Apr 3, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Further revise check for bad data in docker/memory. {pull}17400[17400]
- Fix issue in Jolokia module when mbean contains multiple quoted properties. {issue}17375[17375] {pull}17374[17374]
- Combine cloudwatch aggregated metrics into single event. {pull}17345[17345]
- Fix cloudwatch metricset missing tags collection. {issue}17419[17419] {pull}17424[17424]
- check if cpuOptions field is nil in DescribeInstances output in ec2 metricset. {pull}17418[17418]

*Packetbeat*
Expand Down
48 changes: 30 additions & 18 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ var (
identifierNameIdx = 3
identifierValueIdx = 4
defaultStatistics = []string{"Average", "Maximum", "Minimum", "Sum", "SampleCount"}
labelSeperator = "|"
labelSeparator = "|"
dimensionSeparator = ","
)

// init registers the MetricSet with the central registry as soon as the program
Expand Down Expand Up @@ -360,21 +361,21 @@ func createMetricDataQueries(listMetricsTotal []metricsWithStatistics, period ti

func constructLabel(metric cloudwatch.Metric, statistic string) string {
// label = metricName + namespace + statistic + dimKeys + dimValues
label := *metric.MetricName + labelSeperator + *metric.Namespace + labelSeperator + statistic
label := *metric.MetricName + labelSeparator + *metric.Namespace + labelSeparator + statistic
dimNames := ""
dimValues := ""
for i, dim := range metric.Dimensions {
dimNames += *dim.Name
dimValues += *dim.Value
if i != len(metric.Dimensions)-1 {
dimNames += ","
dimValues += ","
dimNames += dimensionSeparator
dimValues += dimensionSeparator
}
}

if dimNames != "" && dimValues != "" {
label += labelSeperator + dimNames
label += labelSeperator + dimValues
label += labelSeparator + dimNames
label += labelSeparator + dimValues
}
return label
}
Expand All @@ -400,7 +401,7 @@ func generateFieldName(namespace string, labels []string) string {
// Check if statistic method is one of Sum, SampleCount, Minimum, Maximum, Average
// With checkStatistics function, no need to check bool return value here
statMethod, _ := statisticLookup(stat)
// By default, replace dot "." using under bar "_" for metric names
// By default, replace dot "." using underscore "_" for metric names
return "aws." + stripNamespace(namespace) + ".metrics." + common.DeDot(labels[metricNameIdx]) + "." + statMethod
}

Expand Down Expand Up @@ -454,7 +455,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeperator)
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
Expand Down Expand Up @@ -503,7 +504,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeperator)
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
if len(tagsFilter) != 0 {
Expand All @@ -520,20 +521,13 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
}

identifierValue := labels[identifierValueIdx]
tags := resourceTagMap[identifierValue]
if len(tagsFilter) != 0 && len(tags) == 0 {
continue
}

if _, ok := events[identifierValue]; !ok {
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)

// By default, replace dot "." using under bar "_" for tag keys and values
for _, tag := range tags {
events[identifierValue].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), common.DeDot(*tag.Value))
}
// add tags to event based on identifierValue
insertTags(events, identifierValue, resourceTagMap)
}
}
}
Expand Down Expand Up @@ -567,3 +561,21 @@ func compareAWSDimensions(dim1 []cloudwatch.Dimension, dim2 []cloudwatch.Dimensi
sort.Strings(dim2String)
return reflect.DeepEqual(dim1String, dim2String)
}

func insertTags(events map[string]mb.Event, identifier string, resourceTagMap map[string][]resourcegroupstaggingapi.Tag) {
// Check if identifier includes dimensionSeparator (comma in this case),
// split the identifier and check for each sub-identifier.
// For example, identifier might be [storageType, s3BucketName].
// And tags are only store under s3BucketName in resourceTagMap.
subIdentifiers := strings.Split(identifier, dimensionSeparator)
for _, v := range subIdentifiers {
tags := resourceTagMap[v]
if len(tags) != 0 {
// By default, replace dot "." using underscore "_" for tag keys and values
for _, tag := range tags {
events[identifier].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), common.DeDot(*tag.Value))
}
continue
}
}
}
65 changes: 62 additions & 3 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/metricbeat/mb"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
Expand All @@ -23,9 +25,10 @@ import (
)

var (
regionName = "us-west-1"
timestamp = time.Now()
accountID = "123456789012"
regionName = "us-west-1"
timestamp = time.Now()
accountID = "123456789012"
accountName = "test"

id1 = "cpu"
value1 = 0.25
Expand Down Expand Up @@ -1306,3 +1309,59 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, value2, dimension)
}

func TestInsertTags(t *testing.T) {
identifier1 := "StandardStorage,test-s3-1"
identifier2 := "test-s3-2"
tagKey1 := "organization"
tagValue1 := "engineering"
tagKey2 := "owner"
tagValue2 := "foo"

events := map[string]mb.Event{}
events[identifier1] = aws.InitEvent(regionName, accountName, accountID)
events[identifier2] = aws.InitEvent(regionName, accountName, accountID)

resourceTagMap := map[string][]resourcegroupstaggingapi.Tag{}
resourceTagMap["test-s3-1"] = []resourcegroupstaggingapi.Tag{
{
Key: awssdk.String(tagKey1),
Value: awssdk.String(tagValue1),
},
}
resourceTagMap["test-s3-2"] = []resourcegroupstaggingapi.Tag{
{
Key: awssdk.String(tagKey2),
Value: awssdk.String(tagValue2),
},
}

cases := []struct {
title string
identifier string
expectedTagKey string
expectedTagValue string
}{
{
"test identifier with storage type and s3 bucket name",
identifier1,
"aws.tags.organization",
tagValue1,
},
{
"test identifier with only s3 bucket name",
identifier2,
"aws.tags.owner",
tagValue2,
},
}

for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
insertTags(events, c.identifier, resourceTagMap)
value, err := events[c.identifier].RootFields.GetValue(c.expectedTagKey)
assert.NoError(t, err)
assert.Equal(t, c.expectedTagValue, value)
})
}
}
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getQueueUrls(svc sqsiface.ClientAPI) ([]string, error) {
req := svc.ListQueuesRequest(listQueuesInput)
output, err := req.Send(context.TODO())
if err != nil {
err = errors.Wrap(err, "Error DescribeInstances")
err = errors.Wrap(err, "Error ListQueues")
return nil, err
}
return output.QueueUrls, nil
Expand Down