Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Pull metrics from multiple AWS CloudWatch namespaces #9386

Merged
merged 8 commits into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions plugins/inputs/cloudwatch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ API endpoint. In the following order the plugin will attempt to authenticate.
## Configure the TTL for the internal cache of metrics.
# cache_ttl = "1h"

## Metric Statistic Namespace (required)
namespace = "AWS/ELB"
## Metric Statistic Namespaces (required)
namespaces = ["AWS/ELB"]
sspaink marked this conversation as resolved.
Show resolved Hide resolved
# A single metric statistic namespace that will be appended to namespaces on startup
# namespace = "AWS/ELB"

## Maximum requests per second. Note that the global default AWS rate limit is
## 50 reqs/sec, so if you define multiple namespaces, these should add up to a
Expand Down
179 changes: 98 additions & 81 deletions plugins/inputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type CloudWatch struct {
Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"`
Namespace string `toml:"namespace"`
Namespaces []string `toml:"namespaces"`
Metrics []*Metric `toml:"metrics"`
CacheTTL config.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"`
Expand Down Expand Up @@ -71,7 +72,7 @@ type metricCache struct {
ttl time.Duration
built time.Time
metrics []filteredMetric
queries []*cwClient.MetricDataQuery
queries map[string][]*cwClient.MetricDataQuery
}

type cloudwatchClient interface {
Expand Down Expand Up @@ -139,8 +140,10 @@ func (c *CloudWatch) SampleConfig() string {
## Configure the TTL for the internal cache of metrics.
# cache_ttl = "1h"

## Metric Statistic Namespace (required)
namespace = "AWS/ELB"
## Metric Statistic Namespaces (required)
namespaces = ["AWS/ELB"]
akrantz01 marked this conversation as resolved.
Show resolved Hide resolved
# A single metric statistic namespace that will be appended to namespaces on startup
# namespace = "AWS/ELB"

## Maximum requests per second. Note that the global default AWS rate limit is
## 50 reqs/sec, so if you define multiple namespaces, these should add up to a
Expand Down Expand Up @@ -181,25 +184,28 @@ func (c *CloudWatch) Description() string {
return "Pull Metric Statistics from Amazon CloudWatch"
}

// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval".
func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
if c.statFilter == nil {
var err error
// Set config level filter (won't change throughout life of plugin).
c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude)
if err != nil {
return err
}
func (c *CloudWatch) Init() error {
if len(c.Namespace) != 0 {
akrantz01 marked this conversation as resolved.
Show resolved Hide resolved
c.Namespaces = append(c.Namespaces, c.Namespace)
}

if c.client == nil {
err := c.initializeCloudWatch()
if err != nil {
return err
}
err := c.initializeCloudWatch()
if err != nil {
return err
}

// Set config level filter (won't change throughout life of plugin).
c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude)
if err != nil {
return err
}

return nil
}

// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval".
func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
filteredMetrics, err := getFilteredMetrics(c)
if err != nil {
return err
Expand All @@ -221,32 +227,34 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
wg := sync.WaitGroup{}
rLock := sync.Mutex{}

results := []*cwClient.MetricDataResult{}
results := map[string][]*cwClient.MetricDataResult{}

// 500 is the maximum number of metric data queries a `GetMetricData` request can contain.
batchSize := 500
var batches [][]*cwClient.MetricDataQuery
for namespace, namespacedQueries := range queries {
// 500 is the maximum number of metric data queries a `GetMetricData` request can contain.
batchSize := 500
var batches [][]*cwClient.MetricDataQuery

for batchSize < len(queries) {
queries, batches = queries[batchSize:], append(batches, queries[0:batchSize:batchSize])
}
batches = append(batches, queries)

for i := range batches {
wg.Add(1)
<-lmtr.C
go func(inm []*cwClient.MetricDataQuery) {
defer wg.Done()
result, err := c.gatherMetrics(c.getDataInputs(inm))
if err != nil {
acc.AddError(err)
return
}
for batchSize < len(namespacedQueries) {
namespacedQueries, batches = namespacedQueries[batchSize:], append(batches, namespacedQueries[0:batchSize:batchSize])
}
batches = append(batches, namespacedQueries)

for i := range batches {
wg.Add(1)
<-lmtr.C
go func(n string, inm []*cwClient.MetricDataQuery) {
defer wg.Done()
result, err := c.gatherMetrics(c.getDataInputs(inm))
if err != nil {
acc.AddError(err)
return
}

rLock.Lock()
results = append(results, result...)
rLock.Unlock()
}(batches[i])
rLock.Lock()
results[n] = append(results[n], result...)
rLock.Unlock()
}(namespace, batches[i])
}
}

wg.Wait()
Expand Down Expand Up @@ -323,11 +331,13 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
}
}
for _, name := range m.MetricNames {
metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(c.Namespace),
MetricName: aws.String(name),
Dimensions: dimensions,
})
for _, namespace := range c.Namespaces {
metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(namespace),
MetricName: aws.String(name),
Dimensions: dimensions,
})
}
}
} else {
allMetrics, err := c.fetchNamespaceMetrics()
Expand All @@ -337,11 +347,13 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
for _, name := range m.MetricNames {
for _, metric := range allMetrics {
if isSelected(name, metric, m.Dimensions) {
metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(c.Namespace),
MetricName: aws.String(name),
Dimensions: metric.Dimensions,
})
for _, namespace := range c.Namespaces {
metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(namespace),
MetricName: aws.String(name),
Dimensions: metric.Dimensions,
})
}
}
}
}
Expand Down Expand Up @@ -399,24 +411,26 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cwClient.Metric, error) {
recentlyActive = nil
}
params = &cwClient.ListMetricsInput{
Namespace: aws.String(c.Namespace),
Dimensions: []*cwClient.DimensionFilter{},
NextToken: token,
MetricName: nil,
RecentlyActive: recentlyActive,
}
for {
resp, err := c.client.ListMetrics(params)
if err != nil {
return nil, err
}
for _, namespace := range c.Namespaces {
params.Namespace = aws.String(namespace)
for {
resp, err := c.client.ListMetrics(params)
if err != nil {
return nil, err
}

metrics = append(metrics, resp.Metrics...)
if resp.NextToken == nil {
break
}
metrics = append(metrics, resp.Metrics...)
if resp.NextToken == nil {
break
}

params.NextToken = resp.NextToken
params.NextToken = resp.NextToken
}
}

return metrics, nil
Expand All @@ -437,21 +451,21 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) {
}

// getDataQueries gets all of the possible queries so we can maximize the request payload.
func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClient.MetricDataQuery {
func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string][]*cwClient.MetricDataQuery {
if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() {
return c.metricCache.queries
}

c.queryDimensions = map[string]*map[string]string{}

dataQueries := []*cwClient.MetricDataQuery{}
dataQueries := map[string][]*cwClient.MetricDataQuery{}
for i, filtered := range filteredMetrics {
for j, metric := range filtered.metrics {
id := strconv.Itoa(j) + "_" + strconv.Itoa(i)
dimension := ctod(metric.Dimensions)
if filtered.statFilter.Match("average") {
c.queryDimensions["average_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("average_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_average")),
MetricStat: &cwClient.MetricStat{
Expand All @@ -463,7 +477,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien
}
if filtered.statFilter.Match("maximum") {
c.queryDimensions["maximum_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("maximum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
MetricStat: &cwClient.MetricStat{
Expand All @@ -475,7 +489,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien
}
if filtered.statFilter.Match("minimum") {
c.queryDimensions["minimum_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("minimum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
MetricStat: &cwClient.MetricStat{
Expand All @@ -487,7 +501,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien
}
if filtered.statFilter.Match("sum") {
c.queryDimensions["sum_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("sum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
MetricStat: &cwClient.MetricStat{
Expand All @@ -499,7 +513,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien
}
if filtered.statFilter.Match("sample_count") {
c.queryDimensions["sample_count_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("sample_count_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
MetricStat: &cwClient.MetricStat{
Expand Down Expand Up @@ -554,24 +568,27 @@ func (c *CloudWatch) gatherMetrics(

func (c *CloudWatch) aggregateMetrics(
acc telegraf.Accumulator,
metricDataResults []*cwClient.MetricDataResult,
metricDataResults map[string][]*cwClient.MetricDataResult,
) error {
var (
grouper = internalMetric.NewSeriesGrouper()
namespace = sanitizeMeasurement(c.Namespace)
grouper = internalMetric.NewSeriesGrouper()
)

for _, result := range metricDataResults {
tags := map[string]string{}
for namespace, results := range metricDataResults {
namespace = sanitizeMeasurement(namespace)

if dimensions, ok := c.queryDimensions[*result.Id]; ok {
tags = *dimensions
}
tags["region"] = c.Region
for _, result := range results {
tags := map[string]string{}

for i := range result.Values {
if err := grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i]); err != nil {
acc.AddError(err)
if dimensions, ok := c.queryDimensions[*result.Id]; ok {
tags = *dimensions
}
tags["region"] = c.Region

for i := range result.Values {
if err := grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i]); err != nil {
acc.AddError(err)
}
}
}
}
Expand Down
Loading