Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align metrics windows to gather interval for cloudwatch input #4667

Merged
merged 1 commit into from
Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions plugins/inputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type (
RateLimit int `toml:"ratelimit"`
client cloudwatchClient
metricCache *MetricCache
windowStart time.Time
windowEnd time.Time
}

Metric struct {
Expand Down Expand Up @@ -197,6 +199,11 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {

now := time.Now()

err = c.updateWindow(now)
if err != nil {
return err
}

// limit concurrency or we can easily exhaust user connection limit
// see cloudwatch API request limits:
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
Expand All @@ -208,14 +215,30 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
<-lmtr.C
go func(inm *cloudwatch.Metric) {
defer wg.Done()
acc.AddError(c.gatherMetric(acc, inm, now))
acc.AddError(c.gatherMetric(acc, inm))
}(m)
}
wg.Wait()

return nil
}

func (c *CloudWatch) updateWindow(relativeTo time.Time) error {
windowEnd := relativeTo.Add(-c.Delay.Duration)

if c.windowEnd.IsZero() {
// this is the first run, no window info, so just get a single period
c.windowStart = windowEnd.Add(-c.Period.Duration)
} else {
// subsequent window, start where last window left off
c.windowStart = c.windowEnd
}

c.windowEnd = windowEnd

return nil
}

func init() {
inputs.Add("cloudwatch", func() telegraf.Input {
ttl, _ := time.ParseDuration("1hr")
Expand Down Expand Up @@ -291,9 +314,8 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) {
func (c *CloudWatch) gatherMetric(
acc telegraf.Accumulator,
metric *cloudwatch.Metric,
now time.Time,
) error {
params := c.getStatisticsInput(metric, now)
params := c.getStatisticsInput(metric)
resp, err := c.client.GetMetricStatistics(params)
if err != nil {
return err
Expand Down Expand Up @@ -356,12 +378,10 @@ func snakeCase(s string) string {
/*
* Map Metric to *cloudwatch.GetMetricStatisticsInput for given timeframe
*/
func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric, now time.Time) *cloudwatch.GetMetricStatisticsInput {
end := now.Add(-c.Delay.Duration)

func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric) *cloudwatch.GetMetricStatisticsInput {
input := &cloudwatch.GetMetricStatisticsInput{
StartTime: aws.Time(end.Add(-c.Period.Duration)),
EndTime: aws.Time(end),
StartTime: aws.Time(c.windowStart),
EndTime: aws.Time(c.windowEnd),
MetricName: metric.MetricName,
Namespace: metric.Namespace,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Expand Down
37 changes: 36 additions & 1 deletion plugins/inputs/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ func TestGenerateStatisticsInputParams(t *testing.T) {

now := time.Now()

params := c.getStatisticsInput(m, now)
c.updateWindow(now)

params := c.getStatisticsInput(m)

assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration))
assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration))
Expand All @@ -217,3 +219,36 @@ func TestMetricsCacheTimeout(t *testing.T) {
cache.Fetched = time.Now().Add(-time.Minute)
assert.False(t, cache.IsValid())
}

func TestUpdateWindow(t *testing.T) {
duration, _ := time.ParseDuration("1m")
internalDuration := internal.Duration{
Duration: duration,
}

c := &CloudWatch{
Namespace: "AWS/ELB",
Delay: internalDuration,
Period: internalDuration,
}

now := time.Now()

assert.True(t, c.windowEnd.IsZero())
assert.True(t, c.windowStart.IsZero())

c.updateWindow(now)

newStartTime := c.windowEnd

// initial window just has a single period
assert.EqualValues(t, c.windowEnd, now.Add(-c.Delay.Duration))
assert.EqualValues(t, c.windowStart, now.Add(-c.Delay.Duration).Add(-c.Period.Duration))

now = time.Now()
c.updateWindow(now)

// subsequent window uses previous end time as start time
assert.EqualValues(t, c.windowEnd, now.Add(-c.Delay.Duration))
assert.EqualValues(t, c.windowStart, newStartTime)
}