Skip to content

Commit

Permalink
Remove outputs blocking inputs when output is slow (influxdata#4938)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and otherpirate committed Mar 15, 2019
1 parent 0577a51 commit c6e91bf
Show file tree
Hide file tree
Showing 59 changed files with 3,545 additions and 2,119 deletions.
500 changes: 36 additions & 464 deletions CONTRIBUTING.md

Large diffs are not rendered by default.

52 changes: 46 additions & 6 deletions accumulator.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package telegraf

import "time"
import (
"time"
)

// Accumulator is an interface for "accumulating" metrics from plugin(s).
// The metrics are sent down a channel shared between all plugins.
// Accumulator allows adding metrics to the processing flow.
type Accumulator interface {
// AddFields adds a metric to the accumulator with the given measurement
// name, fields, and tags (and timestamp). If a timestamp is not provided,
// then the accumulator sets it to "now".
// Create a point with a value, decorating it with tags
// NOTE: tags is expected to be owned by the caller, don't mutate
// it after passing to Add.
AddFields(measurement string,
fields map[string]interface{},
tags map[string]string,
Expand Down Expand Up @@ -40,7 +38,49 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)

// AddMetric adds an metric to the accumulator.
AddMetric(Metric)

// SetPrecision takes two time.Duration objects. If the first is non-zero,
// it sets that as the precision. Otherwise, it takes the second argument
// as the order of time that the metrics should be rounded to, with the
// maximum being 1s.
SetPrecision(precision, interval time.Duration)

// Report an error.
AddError(err error)

// Upgrade to a TrackingAccumulator with space for maxTracked
// metrics/batches.
WithTracking(maxTracked int) TrackingAccumulator
}

// TrackingID uniquely identifies a tracked metric group
type TrackingID uint64

// DeliveryInfo provides the results of a delivered metric group.
type DeliveryInfo interface {
// ID is the TrackingID
ID() TrackingID

// Delivered returns true if the metric was processed successfully.
Delivered() bool
}

// TrackingAccumulator is an Accumulator that provides a signal when the
// metric has been fully processed. Sending more metrics than the accumulator
// has been allocated for without reading status from the Accepted or Rejected
// channels is an error.
type TrackingAccumulator interface {
Accumulator

// Add the Metric and arrange for tracking feedback after processing..
AddTrackingMetric(m Metric) TrackingID

// Add a group of Metrics and arrange for a signal when the group has been
// processed.
AddTrackingMetricGroup(group []Metric) TrackingID

// Delivered returns a channel that will contain the tracking results.
Delivered() <-chan DeliveryInfo
}
70 changes: 56 additions & 14 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ type MetricMaker interface {

type accumulator struct {
maker MetricMaker
metrics chan telegraf.Metric
metrics chan<- telegraf.Metric
precision time.Duration
}

func NewAccumulator(
maker MetricMaker,
metrics chan telegraf.Metric,
metrics chan<- telegraf.Metric,
) telegraf.Accumulator {
acc := accumulator{
maker: maker,
Expand All @@ -42,7 +42,7 @@ func (ac *accumulator) AddFields(
tags map[string]string,
t ...time.Time,
) {
ac.addMetric(measurement, tags, fields, telegraf.Untyped, t...)
ac.addFields(measurement, tags, fields, telegraf.Untyped, t...)
}

func (ac *accumulator) AddGauge(
Expand All @@ -51,7 +51,7 @@ func (ac *accumulator) AddGauge(
tags map[string]string,
t ...time.Time,
) {
ac.addMetric(measurement, tags, fields, telegraf.Gauge, t...)
ac.addFields(measurement, tags, fields, telegraf.Gauge, t...)
}

func (ac *accumulator) AddCounter(
Expand All @@ -60,7 +60,7 @@ func (ac *accumulator) AddCounter(
tags map[string]string,
t ...time.Time,
) {
ac.addMetric(measurement, tags, fields, telegraf.Counter, t...)
ac.addFields(measurement, tags, fields, telegraf.Counter, t...)
}

func (ac *accumulator) AddSummary(
Expand All @@ -69,7 +69,7 @@ func (ac *accumulator) AddSummary(
tags map[string]string,
t ...time.Time,
) {
ac.addMetric(measurement, tags, fields, telegraf.Summary, t...)
ac.addFields(measurement, tags, fields, telegraf.Summary, t...)
}

func (ac *accumulator) AddHistogram(
Expand All @@ -78,10 +78,16 @@ func (ac *accumulator) AddHistogram(
tags map[string]string,
t ...time.Time,
) {
ac.addMetric(measurement, tags, fields, telegraf.Histogram, t...)
ac.addFields(measurement, tags, fields, telegraf.Histogram, t...)
}

func (ac *accumulator) addMetric(
func (ac *accumulator) AddMetric(m telegraf.Metric) {
if m := ac.maker.MakeMetric(m); m != nil {
ac.metrics <- m
}
}

func (ac *accumulator) addFields(
measurement string,
tags map[string]string,
fields map[string]interface{},
Expand All @@ -104,13 +110,9 @@ func (ac *accumulator) AddError(err error) {
return
}
NErrors.Incr(1)
log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err)
log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err)
}

// SetPrecision takes two time.Duration objects. If the first is non-zero,
// it sets that as the precision. Otherwise, it takes the second argument
// as the order of time that the metrics should be rounded to, with the
// maximum being 1s.
func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
if precision > 0 {
ac.precision = precision
Expand All @@ -128,7 +130,7 @@ func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
}
}

func (ac accumulator) getTime(t []time.Time) time.Time {
func (ac *accumulator) getTime(t []time.Time) time.Time {
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
Expand All @@ -137,3 +139,43 @@ func (ac accumulator) getTime(t []time.Time) time.Time {
}
return timestamp.Round(ac.precision)
}

func (ac *accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator {
return &trackingAccumulator{
Accumulator: ac,
delivered: make(chan telegraf.DeliveryInfo, maxTracked),
}
}

type trackingAccumulator struct {
telegraf.Accumulator
delivered chan telegraf.DeliveryInfo
}

func (a *trackingAccumulator) AddTrackingMetric(m telegraf.Metric) telegraf.TrackingID {
dm, id := metric.WithTracking(m, a.onDelivery)
a.AddMetric(dm)
return id
}

func (a *trackingAccumulator) AddTrackingMetricGroup(group []telegraf.Metric) telegraf.TrackingID {
db, id := metric.WithGroupTracking(group, a.onDelivery)
for _, m := range db {
a.AddMetric(m)
}
return id
}

func (a *trackingAccumulator) Delivered() <-chan telegraf.DeliveryInfo {
return a.delivered
}

func (a *trackingAccumulator) onDelivery(info telegraf.DeliveryInfo) {
select {
case a.delivered <- info:
default:
// This is a programming error in the input. More items were sent for
// tracking than space requested.
panic("channel is full")
}
}
Loading

0 comments on commit c6e91bf

Please sign in to comment.