From 91aaa997efcc8de56342f43fba6454ea682d6139 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 15 Feb 2016 17:21:38 -0700 Subject: [PATCH] Add `Accumulator` to the ServiceInput Start() function closes #666 --- agent/agent.go | 20 ++-- input.go | 2 +- internal/config/config.go | 12 ++- internal/models/running_output.go | 100 +++++++++++++----- .../inputs/github_webhooks/github_webhooks.go | 2 +- .../inputs/kafka_consumer/kafka_consumer.go | 2 +- .../kafka_consumer_integration_test.go | 12 +-- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 2 +- plugins/inputs/nats_consumer/nats_consumer.go | 4 +- plugins/inputs/statsd/statsd.go | 2 +- 10 files changed, 111 insertions(+), 47 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index bd52e78756079..5a70097fc1d69 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -58,7 +58,8 @@ func (a *Agent) Connect() error { } err := o.Output.Connect() if err != nil { - log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err) + log.Printf("Failed to connect to output %s, retrying in 15s, "+ + "error was '%s' \n", o.Name, err) time.Sleep(15 * time.Second) err = o.Output.Connect() if err != nil { @@ -241,7 +242,7 @@ func (a *Agent) Test() error { return nil } -// flush writes a list of points to all configured outputs +// flush writes a list of metrics to all configured outputs func (a *Agent) flush() { var wg sync.WaitGroup @@ -260,7 +261,7 @@ func (a *Agent) flush() { wg.Wait() } -// flusher monitors the points input channel and flushes on the minimum interval +// flusher monitors the metrics input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. @@ -271,14 +272,14 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er for { select { case <-shutdown: - log.Println("Hang on, flushing any cached points before shutdown") + log.Println("Hang on, flushing any cached metrics before shutdown") a.flush() return nil case <-ticker.C: a.flush() case m := <-metricC: for _, o := range a.Config.Outputs { - o.AddPoint(m) + o.AddMetric(m) } } } @@ -318,8 +319,8 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet, a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) - // channel shared between all input threads for accumulating points - metricC := make(chan telegraf.Metric, 1000) + // channel shared between all input threads for accumulating metrics + metricC := make(chan telegraf.Metric, 10000) // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { @@ -342,7 +343,10 @@ func (a *Agent) Run(shutdown chan struct{}) error { // Start service of any ServicePlugins switch p := input.Input.(type) { case telegraf.ServiceInput: - if err := p.Start(); err != nil { + acc := NewAccumulator(input.Config, metricC) + acc.SetDebug(a.Config.Agent.Debug) + acc.setDefaultTags(a.Config.Tags) + if err := p.Start(acc); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", input.Name, err.Error()) return err diff --git a/input.go b/input.go index 6992c1b433a82..f7e1493e2de18 100644 --- a/input.go +++ b/input.go @@ -24,7 +24,7 @@ type ServiceInput interface { Gather(Accumulator) error // Start starts the ServiceInput's service, whatever that may be - Start() error + Start(Accumulator) error // Stop stops the services and closes any necessary channels and connections Stop() diff --git a/internal/config/config.go b/internal/config/config.go index ffd4f632a9765..82246f2a48ff5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -68,7 +68,7 @@ type AgentConfig struct { // same time, which can have a measurable effect on the system. CollectionJitter internal.Duration - // Interval at which to flush data + // FlushInterval is the Interval at which to flush data FlushInterval internal.Duration // FlushJitter Jitters the flush interval by a random amount. @@ -82,6 +82,11 @@ type AgentConfig struct { // full, the oldest metrics will be overwritten. MetricBufferLimit int + // FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever + // it fills up, regardless of FlushInterval. Setting this option to true + // does _not_ deactivate FlushInterval. + FlushBufferWhenFull bool + // TODO(cam): Remove UTC and Precision parameters, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability @@ -157,6 +162,8 @@ var header = `################################################################## ### Telegraf will cache metric_buffer_limit metrics for each output, and will ### flush this buffer on a successful write. metric_buffer_limit = 10000 + ### Flush the buffer whenever full, regardless of flush_interval. + flush_buffer_when_full = true ### Collection jitter is used to jitter the collection by a random amount. ### Each plugin will sleep for a random time within jitter before collecting. @@ -421,8 +428,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error { ro := internal_models.NewRunningOutput(name, output, outputConfig) if c.Agent.MetricBufferLimit > 0 { - ro.PointBufferLimit = c.Agent.MetricBufferLimit + ro.MetricBufferLimit = c.Agent.MetricBufferLimit } + ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull ro.Quiet = c.Agent.Quiet c.Outputs = append(c.Outputs, ro) return nil diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 49a01f8ee108b..663d913f2e328 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -2,22 +2,33 @@ package internal_models import ( "log" + "sync" "time" "github.com/influxdata/telegraf" ) -const DEFAULT_POINT_BUFFER_LIMIT = 10000 +const ( + // Default number of metrics kept between flushes. + DEFAULT_METRIC_BUFFER_LIMIT = 10000 + + // Limit how many full metric buffers are kept due to failed writes. + FULL_METRIC_BUFFERS_LIMIT = 100 +) type RunningOutput struct { - Name string - Output telegraf.Output - Config *OutputConfig - Quiet bool - PointBufferLimit int + Name string + Output telegraf.Output + Config *OutputConfig + Quiet bool + MetricBufferLimit int + FlushBufferWhenFull bool metrics []telegraf.Metric + tmpmetrics chan []telegraf.Metric overwriteCounter int + + sync.Mutex } func NewRunningOutput( @@ -26,44 +37,85 @@ func NewRunningOutput( conf *OutputConfig, ) *RunningOutput { ro := &RunningOutput{ - Name: name, - metrics: make([]telegraf.Metric, 0), - Output: output, - Config: conf, - PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT, + Name: name, + metrics: make([]telegraf.Metric, 0), + tmpmetrics: make(chan []telegraf.Metric, FULL_METRIC_BUFFERS_LIMIT), + Output: output, + Config: conf, + MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT, } return ro } -func (ro *RunningOutput) AddPoint(point telegraf.Metric) { +func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { if ro.Config.Filter.IsActive { - if !ro.Config.Filter.ShouldMetricPass(point) { + if !ro.Config.Filter.ShouldMetricPass(metric) { return } } - if len(ro.metrics) < ro.PointBufferLimit { - ro.metrics = append(ro.metrics, point) + if len(ro.metrics) < ro.MetricBufferLimit { + ro.Lock() + ro.metrics = append(ro.metrics, metric) + ro.Unlock() } else { - log.Printf("WARNING: overwriting cached metrics, you may want to " + - "increase the metric_buffer_limit setting in your [agent] config " + - "if you do not wish to overwrite metrics.\n") - if ro.overwriteCounter == len(ro.metrics) { - ro.overwriteCounter = 0 + if ro.FlushBufferWhenFull { + ro.Lock() + tmpmetrics := make([]telegraf.Metric, len(ro.metrics)) + copy(tmpmetrics, ro.metrics) + ro.metrics = make([]telegraf.Metric, 0) + ro.Unlock() + err := ro.write(tmpmetrics) + if err != nil { + log.Printf("ERROR writing full metric buffer to output %s, %s", + ro.Name, err) + if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT { + // drop one on the floor + <-ro.tmpmetrics + } + ro.tmpmetrics <- tmpmetrics + } + } else { + log.Printf("WARNING: overwriting cached metrics, you may want to " + + "increase the metric_buffer_limit setting in your [agent] " + + "config if you do not wish to overwrite metrics.\n") + if ro.overwriteCounter == len(ro.metrics) { + ro.overwriteCounter = 0 + } + ro.metrics[ro.overwriteCounter] = metric + ro.overwriteCounter++ } - ro.metrics[ro.overwriteCounter] = point - ro.overwriteCounter++ } } func (ro *RunningOutput) Write() error { + ro.Lock() + err := ro.write(ro.metrics) + ro.Unlock() + if err != nil { + return err + } + + // Write any cached metric buffers that failed previously + ntmps := len(ro.tmpmetrics) + for i := 0; i < ntmps; i++ { + tmpmetrics := <-ro.tmpmetrics + if err := ro.write(tmpmetrics); err != nil { + return err + } + } + + return nil +} + +func (ro *RunningOutput) write(metrics []telegraf.Metric) error { start := time.Now() - err := ro.Output.Write(ro.metrics) + err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { if !ro.Quiet { log.Printf("Wrote %d metrics to output %s in %s\n", - len(ro.metrics), ro.Name, elapsed) + len(metrics), ro.Name, elapsed) } ro.metrics = make([]telegraf.Metric, 0) ro.overwriteCounter = 0 diff --git a/plugins/inputs/github_webhooks/github_webhooks.go b/plugins/inputs/github_webhooks/github_webhooks.go index a66563addb292..6dc97f5a332b0 100644 --- a/plugins/inputs/github_webhooks/github_webhooks.go +++ b/plugins/inputs/github_webhooks/github_webhooks.go @@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() { } } -func (gh *GithubWebhooks) Start() error { +func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error { go gh.Listen() log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress) return nil diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 9fa47dee9bf1e..7ff06d4acebbc 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -72,7 +72,7 @@ func (k *Kafka) SetParser(parser parsers.Parser) { k.parser = parser } -func (k *Kafka) Start() error { +func (k *Kafka) Start(_ telegraf.Accumulator) error { k.Lock() defer k.Unlock() var consumerErr error diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index 458d43d355e26..527856b41b3b1 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -44,7 +44,12 @@ func TestReadsMetricsFromKafka(t *testing.T) { } p, _ := parsers.NewInfluxParser() k.SetParser(p) - if err := k.Start(); err != nil { + + // Verify that we can now gather the sent message + var acc testutil.Accumulator + // Sanity check + assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") + if err := k.Start(&acc); err != nil { t.Fatal(err.Error()) } else { defer k.Stop() @@ -52,11 +57,6 @@ func TestReadsMetricsFromKafka(t *testing.T) { waitForPoint(k, t) - // Verify that we can now gather the sent message - var acc testutil.Accumulator - // Sanity check - assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") - // Gather points err = k.Gather(&acc) require.NoError(t, err) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 8ca0d44b1a281..1eab50944ba48 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -89,7 +89,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) { m.parser = parser } -func (m *MQTTConsumer) Start() error { +func (m *MQTTConsumer) Start(_ telegraf.Accumulator) error { m.Lock() defer m.Unlock() if m.QoS > 2 || m.QoS < 0 { diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 56d56990f27a8..2de7062d85b34 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -55,7 +55,7 @@ var sampleConfig = ` queue_group = "telegraf_consumers" ### Maximum number of metrics to buffer between collection intervals metric_buffer = 100000 - + ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read ### more about them here: @@ -84,7 +84,7 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro } // Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up. -func (n *natsConsumer) Start() error { +func (n *natsConsumer) Start(_ telegraf.Accumulator) error { n.Lock() defer n.Unlock() diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index fb8de402e4185..470e318848cc4 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -213,7 +213,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { return nil } -func (s *Statsd) Start() error { +func (s *Statsd) Start(_ telegraf.Accumulator) error { // Make data structures s.done = make(chan struct{}) s.in = make(chan []byte, s.AllowedPendingMessages)