Skip to content

Commit

Permalink
drop metrics outside of the aggregators period
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Oct 10, 2016
1 parent 2f232ac commit e468300
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
12 changes: 12 additions & 0 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type RunningAggregator struct {
Config *AggregatorConfig

metrics chan telegraf.Metric

periodStart time.Time
periodEnd time.Time
}

func NewRunningAggregator(
Expand Down Expand Up @@ -109,6 +112,8 @@ func (r *RunningAggregator) Run(
acc telegraf.Accumulator,
shutdown chan struct{},
) {
r.periodStart = time.Now()
r.periodEnd = r.periodStart.Add(r.Config.Period + r.Config.Delay)
time.Sleep(r.Config.Delay)
periodT := time.NewTicker(r.Config.Period)
defer periodT.Stop()
Expand All @@ -122,8 +127,15 @@ func (r *RunningAggregator) Run(
}
return
case m := <-r.metrics:
if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd) {
// the metric is outside the current aggregation period, so
// skip it.
continue
}
r.add(m)
case <-periodT.C:
r.periodStart = r.periodEnd.Add(-r.Config.Delay)
r.periodEnd = r.periodStart.Add(r.Config.Period + r.Config.Delay)
r.push(acc)
r.reset()
}
Expand Down
62 changes: 59 additions & 3 deletions internal/models/running_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,64 @@ func TestAdd(t *testing.T) {
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
time.Now().Add(time.Millisecond*150),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if atomic.LoadInt64(&a.sum) > 0 {
break
}
}
assert.Equal(t, int64(101), atomic.LoadInt64(&a.sum))
}

func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 500,
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
go ra.Run(&acc, make(chan struct{}))

// metric before current period
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(-time.Hour),
)
assert.False(t, ra.Add(m))

// metric after current period
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(time.Hour),
)
assert.False(t, ra.Add(m))

// "now" metric
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now().Add(time.Millisecond*50),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if atomic.LoadInt64(&a.sum) > 0 {
break
}
Expand Down Expand Up @@ -68,11 +121,12 @@ func TestAddAndPushOnePeriod(t *testing.T) {
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
time.Now().Add(time.Millisecond*100),
)
assert.False(t, ra.Add(m))

for {
time.Sleep(time.Millisecond)
if acc.NMetrics() > 0 {
break
}
Expand Down Expand Up @@ -182,7 +236,9 @@ type TestAggregator struct {

func (t *TestAggregator) Description() string { return "" }
func (t *TestAggregator) SampleConfig() string { return "" }
func (t *TestAggregator) Reset() {}
func (t *TestAggregator) Reset() {
atomic.StoreInt64(&t.sum, 0)
}

func (t *TestAggregator) Push(acc telegraf.Accumulator) {
acc.AddFields("TestMetric",
Expand Down

0 comments on commit e468300

Please sign in to comment.