Skip to content

Commit

Permalink
Add grace period for metrics late for aggregation (influxdata#6049)
Browse files Browse the repository at this point in the history
  • Loading branch information
pberlowski authored and idohalevi committed Sep 23, 2020
1 parent 6b9b2bb commit c7badfe
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 3 deletions.
4 changes: 4 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ Parameters that can be used with any aggregator plugin:
how long for aggregators to wait before receiving metrics from input
plugins, in the case that aggregators are flushing and inputs are gathering
on the same interval.
- **grace**: The duration when the metrics will still be aggregated
by the plugin, even though they're outside of the aggregation period. This
is needed in a situation when the agent is expected to receive late metrics
and it's acceptable to roll them up into next aggregation period.
- **drop_original**: If true, the original metric will be dropped by the
aggregator and will not get sent to the output plugins.
- **name_override**: Override the base name of the measurement. (Default is
Expand Down
14 changes: 14 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
Name: name,
Delay: time.Millisecond * 100,
Period: time.Second * 30,
Grace: time.Second * 0,
}

if node, ok := tbl.Fields["period"]; ok {
Expand Down Expand Up @@ -1053,6 +1054,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}

if node, ok := tbl.Fields["grace"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Grace = dur
}
}
}
if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
Expand Down Expand Up @@ -1100,6 +1113,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err

delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "grace")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
Expand Down
7 changes: 4 additions & 3 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type AggregatorConfig struct {
DropOriginal bool
Period time.Duration
Delay time.Duration
Grace time.Duration

NameOverride string
MeasurementPrefix string
Expand Down Expand Up @@ -135,9 +136,9 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool {
r.Lock()
defer r.Unlock()

if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd)
if m.Time().Before(r.periodStart.Add(-r.Config.Grace)) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd, r.Config.Grace)
r.MetricsDropped.Incr(1)
return r.Config.DropOriginal
}
Expand Down
62 changes: 62 additions & 0 deletions internal/models/running_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,68 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"])
}

func TestAddMetricsOutsideCurrentPeriodWithGrace(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 1500,
Grace: time.Millisecond * 500,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))

m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now.Add(-time.Hour),
telegraf.Untyped,
)
require.False(t, ra.Add(m))

// metric before current period (late)
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(100),
},
now.Add(-time.Millisecond*1000),
telegraf.Untyped,
)
require.False(t, ra.Add(m))

// metric before current period, but within grace period (late)
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(102),
},
now.Add(-time.Millisecond*200),
telegraf.Untyped,
)
require.False(t, ra.Add(m))

// "now" metric
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*50),
telegraf.Untyped)
require.False(t, ra.Add(m))

ra.Push(&acc)
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, int64(203), acc.Metrics[0].Fields["sum"])
}

func TestAddAndPushOnePeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Expand Down

0 comments on commit c7badfe

Please sign in to comment.