Skip to content

Commit

Permalink
[8.3](backport #32408) [AWS] Fix CloudWatch readCloudwatchConfig func…
Browse files Browse the repository at this point in the history
…tion (#32418)

(cherry picked from commit 50f968d)

* fix return error for reportEvents

Co-authored-by: kaiyan-sheng <kaiyan.sheng@elastic.co>
  • Loading branch information
mergify[bot] and kaiyan-sheng authored Jul 22, 2022
1 parent 2a23d4a commit 1755b5d
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 27 deletions.
45 changes: 19 additions & 26 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package cloudwatch

import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
Expand All @@ -14,7 +16,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"
Expand Down Expand Up @@ -97,7 +98,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
logger := logp.NewLogger(metricsetName)
metricSet, err := aws.NewMetricSet(base)
if err != nil {
return nil, errors.Wrap(err, "error creating aws metricset")
return nil, fmt.Errorf("error creating aws metricset: %w", err)
}

config := struct {
Expand All @@ -106,7 +107,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

err = base.Module().UnpackConfig(&config)
if err != nil {
return nil, errors.Wrap(err, "error unpack raw module config using UnpackConfig")
return nil, fmt.Errorf("error unpack raw module config using UnpackConfig: %w", err)
}

logger.Debugf("cloudwatch config = %s", config)
Expand All @@ -132,7 +133,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Check statistic method in config
err := m.checkStatistics()
if err != nil {
return errors.Wrap(err, "checkStatistics failed")
return fmt.Errorf("checkStatistics failed: %w", err)
}

// Get listMetricDetailTotal and namespaceDetailTotal from configuration
Expand Down Expand Up @@ -162,14 +163,14 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {

eventsWithIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, listMetricDetailTotal.metricsWithStats, listMetricDetailTotal.resourceTypeFilters, regionName, startTime, endTime)
if err != nil {
return errors.Wrap(err, "createEvents failed for region "+regionName)
return fmt.Errorf("createEvents failed for region %s: %w", regionName, err)
}

m.logger.Debugf("Collected metrics of metrics = %d", len(eventsWithIdentifier))

err = reportEvents(eventsWithIdentifier, report)
if err != nil {
return errors.Wrap(err, "reportEvents failed")
return fmt.Errorf("reportEvents failed: %w", err)
}
}
}
Expand All @@ -196,7 +197,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
continue
}

if listMetricsOutput == nil || len(listMetricsOutput) == 0 {
if len(listMetricsOutput) == 0 {
continue
}

Expand All @@ -207,14 +208,14 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {

eventsWithIdentifier, err := m.createEvents(svcCloudwatch, svcResourceAPI, filteredMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
if err != nil {
return errors.Wrap(err, "createEvents failed for region "+regionName)
return fmt.Errorf("createEvents failed for region %s: %w", regionName, err)
}

m.logger.Debugf("Collected number of metrics = %d", len(eventsWithIdentifier))

err = reportEvents(addMetadata(namespace, m.Endpoint, regionName, awsConfig, config.AWSConfig.FIPSEnabled, eventsWithIdentifier), report)
if err != nil {
return errors.Wrap(err, "reportEvents failed")
return fmt.Errorf("reportEvents failed: %w", err)
}
}
}
Expand Down Expand Up @@ -343,11 +344,7 @@ func (m *MetricSet) readCloudwatchConfig() (listMetricWithDetail, map[string][]n
}

if config.ResourceType != "" {
if _, ok := resourceTypesWithTags[config.ResourceType]; ok {
resourceTypesWithTags[config.ResourceType] = m.MetricSet.TagsFilter
} else {
resourceTypesWithTags[config.ResourceType] = append(resourceTypesWithTags[config.ResourceType], m.MetricSet.TagsFilter...)
}
resourceTypesWithTags[config.ResourceType] = m.MetricSet.TagsFilter
}
continue
}
Expand All @@ -360,11 +357,7 @@ func (m *MetricSet) readCloudwatchConfig() (listMetricWithDetail, map[string][]n
dimensions: cloudwatchDimensions,
}

if _, ok := namespaceDetailTotal[config.Namespace]; ok {
namespaceDetailTotal[config.Namespace] = append(namespaceDetailTotal[config.Namespace], configPerNamespace)
} else {
namespaceDetailTotal[config.Namespace] = []namespaceDetail{configPerNamespace}
}
namespaceDetailTotal[config.Namespace] = append(namespaceDetailTotal[config.Namespace], configPerNamespace)
}

listMetricDetailTotal.resourceTypeFilters = resourceTypesWithTags
Expand Down Expand Up @@ -451,16 +444,16 @@ func stripNamespace(namespace string) string {

func insertRootFields(event mb.Event, metricValue float64, labels []string) mb.Event {
namespace := labels[namespaceIdx]
event.RootFields.Put(generateFieldName(namespace, labels), metricValue)
event.RootFields.Put("aws.cloudwatch.namespace", namespace)
_, _ = event.RootFields.Put(generateFieldName(namespace, labels), metricValue)
_, _ = event.RootFields.Put("aws.cloudwatch.namespace", namespace)
if len(labels) == 3 {
return event
}

dimNames := strings.Split(labels[identifierNameIdx], ",")
dimValues := strings.Split(labels[identifierValueIdx], ",")
for i := 0; i < len(dimNames); i++ {
event.RootFields.Put("aws.dimensions."+dimNames[i], dimValues[i])
_, _ = event.RootFields.Put("aws.dimensions."+dimNames[i], dimValues[i])
}
return event
}
Expand All @@ -480,7 +473,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
metricDataResults, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime)
m.logger.Debugf("Number of metricDataResults = %d", len(metricDataResults))
if err != nil {
return events, errors.Wrap(err, "GetMetricDataResults failed")
return events, fmt.Errorf("GetMetricDataResults failed: %w", err)
}

// Find a timestamp for all metrics in output
Expand Down Expand Up @@ -526,7 +519,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
resourceTagMap, err := aws.GetResourcesTags(svcResourceAPI, []string{resourceType})
if err != nil {
// If GetResourcesTags failed, continue report event just without tags.
m.logger.Info(errors.Wrap(err, "getResourcesTags failed, skipping region "+regionName))
m.logger.Info(fmt.Errorf("getResourcesTags failed, skipping region %s: %w", regionName, err))
}

if len(tagsFilter) != 0 && len(resourceTagMap) == 0 {
Expand Down Expand Up @@ -589,7 +582,7 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
func reportEvents(eventsWithIdentifier map[string]mb.Event, report mb.ReporterV2) error {
for _, event := range eventsWithIdentifier {
if reported := report.Event(event); !reported {
return nil
return errors.New("report event failed")
}
}
return nil
Expand Down Expand Up @@ -647,7 +640,7 @@ func insertTags(events map[string]mb.Event, identifier string, resourceTagMap ma
// By default, replace dot "." using underscore "_" for tag keys.
// Note: tag values are not dedotted.
for _, tag := range tags {
events[identifier].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), *tag.Value)
_, _ = events[identifier].RootFields.Put("aws.tags."+common.DeDot(*tag.Key), *tag.Value)
}
continue
}
Expand Down
101 changes: 100 additions & 1 deletion x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package cloudwatch

import (
"errors"
"net/http"
"testing"
"time"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/metricbeat/mb"
Expand Down Expand Up @@ -427,6 +427,52 @@ func TestReadCloudwatchConfig(t *testing.T) {
resourceTypeFilters: resourceTypeFiltersEC2,
}

expectedListMetricWithDetailEC2sRDSWithTag := listMetricWithDetail{
metricsWithStats: []metricsWithStatistics{
{
cloudwatch.Metric{
Dimensions: []cloudwatch.Dimension{{
Name: awssdk.String("InstanceId"),
Value: awssdk.String("i-1"),
}},
MetricName: awssdk.String("CPUUtilization"),
Namespace: awssdk.String("AWS/EC2"),
},
[]string{"Average"},
nil,
},
{
cloudwatch.Metric{
Dimensions: []cloudwatch.Dimension{{
Name: awssdk.String("InstanceId"),
Value: awssdk.String("i-2"),
}},
MetricName: awssdk.String("DiskReadBytes"),
Namespace: awssdk.String("AWS/EC2"),
},
[]string{"Sum"},
nil,
},
{
cloudwatch.Metric{
Dimensions: []cloudwatch.Dimension{{
Name: awssdk.String("DBClusterIdentifier"),
Value: awssdk.String("test1-cluster"),
},
{
Name: awssdk.String("Role"),
Value: awssdk.String("READER"),
}},
MetricName: awssdk.String("CommitThroughput"),
Namespace: awssdk.String("AWS/RDS"),
},
[]string{"Average"},
nil,
},
},
resourceTypeFilters: resourceTypeFiltersEC2RDSWithTag,
}

cases := []struct {
title string
cloudwatchMetricsConfig []Config
Expand Down Expand Up @@ -708,6 +754,59 @@ func TestReadCloudwatchConfig(t *testing.T) {
expectedListMetricsEC2WithDim,
map[string][]namespaceDetail{},
},
{
"Test with same namespace and tag filters but different metric names",
[]Config{
{
Namespace: "AWS/EC2",
MetricName: []string{"CPUUtilization"},
Dimensions: []Dimension{
{
Name: "InstanceId",
Value: "i-1",
},
},
Statistic: []string{"Average"},
ResourceType: "ec2:instance",
},
{
Namespace: "AWS/EC2",
MetricName: []string{"DiskReadBytes"},
Dimensions: []Dimension{
{
Name: "InstanceId",
Value: "i-2",
},
},
Statistic: []string{"Sum"},
ResourceType: "ec2:instance",
},
{
Namespace: "AWS/RDS",
MetricName: []string{"CommitThroughput"},
Dimensions: []Dimension{
{
Name: "DBClusterIdentifier",
Value: "test1-cluster",
},
{
Name: "Role",
Value: "READER",
},
},
Statistic: []string{"Average"},
ResourceType: "rds",
},
},
[]aws.Tag{
{
Key: "name",
Value: "test",
},
},
expectedListMetricWithDetailEC2sRDSWithTag,
map[string][]namespaceDetail{},
},
}

for _, c := range cases {
Expand Down

0 comments on commit 1755b5d

Please sign in to comment.