Skip to content

Commit

Permalink
drop metrics outside of the aggregators period
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Oct 10, 2016
1 parent 2f232ac commit 735a564
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 4 deletions.
4 changes: 3 additions & 1 deletion docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ There are no generic configuration options available for all outputs.

The following config parameters are available for all aggregators:

* **period**: The period on which to flush & clear each aggregator.
* **period**: The period on which to flush & clear each aggregator. All metrics
that are sent with timestamps outside of this period will be ignored by the
aggregator.
* **delay**: The delay before each aggregator is flushed. This is to control
how long for aggregators to wait before receiving metrics from input plugins,
in the case that aggregators are flushing and inputs are gathering on the
Expand Down
12 changes: 12 additions & 0 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type RunningAggregator struct {
Config *AggregatorConfig

metrics chan telegraf.Metric

periodStart time.Time
periodEnd time.Time
}

func NewRunningAggregator(
Expand Down Expand Up @@ -109,6 +112,8 @@ func (r *RunningAggregator) Run(
acc telegraf.Accumulator,
shutdown chan struct{},
) {
r.periodStart = time.Now()
r.periodEnd = r.periodStart.Add(r.Config.Period + r.Config.Delay)
time.Sleep(r.Config.Delay)
periodT := time.NewTicker(r.Config.Period)
defer periodT.Stop()
Expand All @@ -122,8 +127,15 @@ func (r *RunningAggregator) Run(
}
return
case m := <-r.metrics:
if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd) {
// the metric is outside the current aggregation period, so
// skip it.
continue
}
r.add(m)
case <-periodT.C:
r.periodStart = r.periodEnd.Add(-r.Config.Delay)
r.periodEnd = r.periodStart.Add(r.Config.Period + r.Config.Delay)
r.push(acc)
r.reset()
}
Expand Down
62 changes: 59 additions & 3 deletions internal/models/running_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,64 @@ func TestAdd(t *testing.T) {
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
time.Now().Add(time.Millisecond*150),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if atomic.LoadInt64(&a.sum) > 0 {
break
}
}
assert.Equal(t, int64(101), atomic.LoadInt64(&a.sum))
}

func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 500,
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
go ra.Run(&acc, make(chan struct{}))

// metric before current period
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(-time.Hour),
)
assert.False(t, ra.Add(m))

// metric after current period
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(time.Hour),
)
assert.False(t, ra.Add(m))

// "now" metric
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(time.Millisecond*50),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if atomic.LoadInt64(&a.sum) > 0 {
break
}
Expand Down Expand Up @@ -68,11 +121,12 @@ func TestAddAndPushOnePeriod(t *testing.T) {
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
time.Now().Add(time.Millisecond*100),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if acc.NMetrics() > 0 {
break
}
Expand Down Expand Up @@ -182,7 +236,9 @@ type TestAggregator struct {

func (t *TestAggregator) Description() string { return "" }
func (t *TestAggregator) SampleConfig() string { return "" }
func (t *TestAggregator) Reset() {}
func (t *TestAggregator) Reset() {
atomic.StoreInt64(&t.sum, 0)
}

func (t *TestAggregator) Push(acc telegraf.Accumulator) {
acc.AddFields("TestMetric",
Expand Down

0 comments on commit 735a564

Please sign in to comment.