Skip to content

Commit

Permalink
[Metricbeat] Fix cloudwatch metricset missing tags collection (elasti…
Browse files Browse the repository at this point in the history
…c#17424)

* Fix cloudwatch metricset missing tags
  • Loading branch information
kaiyan-sheng authored Apr 3, 2020
1 parent 7c09e29 commit db335be
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Further revise check for bad data in docker/memory. {pull}17400[17400]
- Fix issue in Jolokia module when mbean contains multiple quoted properties. {issue}17375[17375] {pull}17374[17374]
- Combine cloudwatch aggregated metrics into single event. {pull}17345[17345]
- Fix cloudwatch metricset missing tags collection. {issue}17419[17419] {pull}17424[17424]
- check if cpuOptions field is nil in DescribeInstances output in ec2 metricset. {pull}17418[17418]

*Packetbeat*
Expand Down
48 changes: 30 additions & 18 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ var (
identifierNameIdx = 3
identifierValueIdx = 4
defaultStatistics = []string{"Average", "Maximum", "Minimum", "Sum", "SampleCount"}
labelSeperator = "|"
labelSeparator = "|"
dimensionSeparator = ","
)

// init registers the MetricSet with the central registry as soon as the program
Expand Down Expand Up @@ -360,21 +361,21 @@ func createMetricDataQueries(listMetricsTotal []metricsWithStatistics, period ti

func constructLabel(metric cloudwatch.Metric, statistic string) string {
// label = metricName + namespace + statistic + dimKeys + dimValues
label := *metric.MetricName + labelSeperator + *metric.Namespace + labelSeperator + statistic
label := *metric.MetricName + labelSeparator + *metric.Namespace + labelSeparator + statistic
dimNames := ""
dimValues := ""
for i, dim := range metric.Dimensions {
dimNames += *dim.Name
dimValues += *dim.Value
if i != len(metric.Dimensions)-1 {
dimNames += ","
dimValues += ","
dimNames += dimensionSeparator
dimValues += dimensionSeparator
}
}

if dimNames != "" && dimValues != "" {
label += labelSeperator + dimNames
label += labelSeperator + dimValues
label += labelSeparator + dimNames
label += labelSeparator + dimValues
}
return label
}
Expand All @@ -400,7 +401,7 @@ func generateFieldName(namespace string, labels []string) string {
// Check if statistic method is one of Sum, SampleCount, Minimum, Maximum, Average
// With checkStatistics function, no need to check bool return value here
statMethod, _ := statisticLookup(stat)
// By default, replace dot "." using under bar "_" for metric names
// By default, replace dot "." using underscore "_" for metric names
return "aws." + stripNamespace(namespace) + ".metrics." + common.DeDot(labels[metricNameIdx]) + "." + statMethod
}

Expand Down Expand Up @@ -454,7 +455,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeperator)
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
Expand Down Expand Up @@ -503,7 +504,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeperator)
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
if len(tagsFilter) != 0 {
Expand All @@ -520,20 +521,13 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
}

identifierValue := labels[identifierValueIdx]
tags := resourceTagMap[identifierValue]
if len(tagsFilter) != 0 && len(tags) == 0 {
continue
}

if _, ok := events[identifierValue]; !ok {
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)

// By default, replace dot "." using under bar "_" for tag keys and values
for _, tag := range tags {
events[identifierValue].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), common.DeDot(*tag.Value))
}
// add tags to event based on identifierValue
insertTags(events, identifierValue, resourceTagMap)
}
}
}
Expand Down Expand Up @@ -567,3 +561,21 @@ func compareAWSDimensions(dim1 []cloudwatch.Dimension, dim2 []cloudwatch.Dimensi
sort.Strings(dim2String)
return reflect.DeepEqual(dim1String, dim2String)
}

func insertTags(events map[string]mb.Event, identifier string, resourceTagMap map[string][]resourcegroupstaggingapi.Tag) {
// Check if identifier includes dimensionSeparator (comma in this case),
// split the identifier and check for each sub-identifier.
// For example, identifier might be [storageType, s3BucketName].
// And tags are only store under s3BucketName in resourceTagMap.
subIdentifiers := strings.Split(identifier, dimensionSeparator)
for _, v := range subIdentifiers {
tags := resourceTagMap[v]
if len(tags) != 0 {
// By default, replace dot "." using underscore "_" for tag keys and values
for _, tag := range tags {
events[identifier].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), common.DeDot(*tag.Value))
}
continue
}
}
}
65 changes: 62 additions & 3 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/metricbeat/mb"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
Expand All @@ -23,9 +25,10 @@ import (
)

var (
regionName = "us-west-1"
timestamp = time.Now()
accountID = "123456789012"
regionName = "us-west-1"
timestamp = time.Now()
accountID = "123456789012"
accountName = "test"

id1 = "cpu"
value1 = 0.25
Expand Down Expand Up @@ -1306,3 +1309,59 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, value2, dimension)
}

func TestInsertTags(t *testing.T) {
identifier1 := "StandardStorage,test-s3-1"
identifier2 := "test-s3-2"
tagKey1 := "organization"
tagValue1 := "engineering"
tagKey2 := "owner"
tagValue2 := "foo"

events := map[string]mb.Event{}
events[identifier1] = aws.InitEvent(regionName, accountName, accountID)
events[identifier2] = aws.InitEvent(regionName, accountName, accountID)

resourceTagMap := map[string][]resourcegroupstaggingapi.Tag{}
resourceTagMap["test-s3-1"] = []resourcegroupstaggingapi.Tag{
{
Key: awssdk.String(tagKey1),
Value: awssdk.String(tagValue1),
},
}
resourceTagMap["test-s3-2"] = []resourcegroupstaggingapi.Tag{
{
Key: awssdk.String(tagKey2),
Value: awssdk.String(tagValue2),
},
}

cases := []struct {
title string
identifier string
expectedTagKey string
expectedTagValue string
}{
{
"test identifier with storage type and s3 bucket name",
identifier1,
"aws.tags.organization",
tagValue1,
},
{
"test identifier with only s3 bucket name",
identifier2,
"aws.tags.owner",
tagValue2,
},
}

for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
insertTags(events, c.identifier, resourceTagMap)
value, err := events[c.identifier].RootFields.GetValue(c.expectedTagKey)
assert.NoError(t, err)
assert.Equal(t, c.expectedTagValue, value)
})
}
}
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getQueueUrls(svc sqsiface.ClientAPI) ([]string, error) {
req := svc.ListQueuesRequest(listQueuesInput)
output, err := req.Send(context.TODO())
if err != nil {
err = errors.Wrap(err, "Error DescribeInstances")
err = errors.Wrap(err, "Error ListQueues")
return nil, err
}
return output.QueueUrls, nil
Expand Down

0 comments on commit db335be

Please sign in to comment.