From 6c134627142abc8eb8f9b10a8154371fc21e566d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 15 Feb 2016 17:21:38 -0700 Subject: [PATCH] Add `Accumulator` to the ServiceInput Start() function closes #666 --- agent/agent.go | 20 +-- input.go | 2 +- internal/config/config.go | 12 +- internal/models/running_output.go | 119 +++++++++++++----- .../inputs/github_webhooks/github_webhooks.go | 2 +- .../inputs/kafka_consumer/kafka_consumer.go | 43 ++----- .../kafka_consumer_integration_test.go | 19 +-- .../kafka_consumer/kafka_consumer_test.go | 58 ++++----- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 55 +++----- .../mqtt_consumer/mqtt_consumer_test.go | 59 ++++----- plugins/inputs/nats_consumer/nats_consumer.go | 38 ++---- .../nats_consumer/nats_consumer_test.go | 65 ++++------ plugins/inputs/statsd/statsd.go | 2 +- testutil/accumulator.go | 14 +++ 14 files changed, 251 insertions(+), 257 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index bd52e78756079..5a70097fc1d69 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -58,7 +58,8 @@ func (a *Agent) Connect() error { } err := o.Output.Connect() if err != nil { - log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err) + log.Printf("Failed to connect to output %s, retrying in 15s, "+ + "error was '%s' \n", o.Name, err) time.Sleep(15 * time.Second) err = o.Output.Connect() if err != nil { @@ -241,7 +242,7 @@ func (a *Agent) Test() error { return nil } -// flush writes a list of points to all configured outputs +// flush writes a list of metrics to all configured outputs func (a *Agent) flush() { var wg sync.WaitGroup @@ -260,7 +261,7 @@ func (a *Agent) flush() { wg.Wait() } -// flusher monitors the points input channel and flushes on the minimum interval +// flusher monitors the metrics input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. @@ -271,14 +272,14 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er for { select { case <-shutdown: - log.Println("Hang on, flushing any cached points before shutdown") + log.Println("Hang on, flushing any cached metrics before shutdown") a.flush() return nil case <-ticker.C: a.flush() case m := <-metricC: for _, o := range a.Config.Outputs { - o.AddPoint(m) + o.AddMetric(m) } } } @@ -318,8 +319,8 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet, a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) - // channel shared between all input threads for accumulating points - metricC := make(chan telegraf.Metric, 1000) + // channel shared between all input threads for accumulating metrics + metricC := make(chan telegraf.Metric, 10000) // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { @@ -342,7 +343,10 @@ func (a *Agent) Run(shutdown chan struct{}) error { // Start service of any ServicePlugins switch p := input.Input.(type) { case telegraf.ServiceInput: - if err := p.Start(); err != nil { + acc := NewAccumulator(input.Config, metricC) + acc.SetDebug(a.Config.Agent.Debug) + acc.setDefaultTags(a.Config.Tags) + if err := p.Start(acc); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", input.Name, err.Error()) return err diff --git a/input.go b/input.go index 6992c1b433a82..f7e1493e2de18 100644 --- a/input.go +++ b/input.go @@ -24,7 +24,7 @@ type ServiceInput interface { Gather(Accumulator) error // Start starts the ServiceInput's service, whatever that may be - Start() error + Start(Accumulator) error // Stop stops the services and closes any necessary channels and connections Stop() diff --git a/internal/config/config.go b/internal/config/config.go index ffd4f632a9765..82246f2a48ff5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -68,7 +68,7 @@ type AgentConfig struct { // same time, which can have a measurable effect on the system. CollectionJitter internal.Duration - // Interval at which to flush data + // FlushInterval is the Interval at which to flush data FlushInterval internal.Duration // FlushJitter Jitters the flush interval by a random amount. @@ -82,6 +82,11 @@ type AgentConfig struct { // full, the oldest metrics will be overwritten. MetricBufferLimit int + // FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever + // it fills up, regardless of FlushInterval. Setting this option to true + // does _not_ deactivate FlushInterval. + FlushBufferWhenFull bool + // TODO(cam): Remove UTC and Precision parameters, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability @@ -157,6 +162,8 @@ var header = `################################################################## ### Telegraf will cache metric_buffer_limit metrics for each output, and will ### flush this buffer on a successful write. metric_buffer_limit = 10000 + ### Flush the buffer whenever full, regardless of flush_interval. + flush_buffer_when_full = true ### Collection jitter is used to jitter the collection by a random amount. ### Each plugin will sleep for a random time within jitter before collecting. @@ -421,8 +428,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error { ro := internal_models.NewRunningOutput(name, output, outputConfig) if c.Agent.MetricBufferLimit > 0 { - ro.PointBufferLimit = c.Agent.MetricBufferLimit + ro.MetricBufferLimit = c.Agent.MetricBufferLimit } + ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull ro.Quiet = c.Agent.Quiet c.Outputs = append(c.Outputs, ro) return nil diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 49a01f8ee108b..b94d94731892d 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -2,22 +2,34 @@ package internal_models import ( "log" + "sync" "time" "github.com/influxdata/telegraf" ) -const DEFAULT_POINT_BUFFER_LIMIT = 10000 +const ( + // Default number of metrics kept between flushes. + DEFAULT_METRIC_BUFFER_LIMIT = 10000 + + // Limit how many full metric buffers are kept due to failed writes. + FULL_METRIC_BUFFERS_LIMIT = 100 +) type RunningOutput struct { - Name string - Output telegraf.Output - Config *OutputConfig - Quiet bool - PointBufferLimit int + Name string + Output telegraf.Output + Config *OutputConfig + Quiet bool + MetricBufferLimit int + FlushBufferWhenFull bool - metrics []telegraf.Metric - overwriteCounter int + metrics []telegraf.Metric + tmpmetrics map[int][]telegraf.Metric + overwriteI int + mapI int + + sync.Mutex } func NewRunningOutput( @@ -26,47 +38,98 @@ func NewRunningOutput( conf *OutputConfig, ) *RunningOutput { ro := &RunningOutput{ - Name: name, - metrics: make([]telegraf.Metric, 0), - Output: output, - Config: conf, - PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT, + Name: name, + metrics: make([]telegraf.Metric, 0), + tmpmetrics: make(map[int][]telegraf.Metric), + Output: output, + Config: conf, + MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT, } return ro } -func (ro *RunningOutput) AddPoint(point telegraf.Metric) { +// AddMetric adds a metric to the output. This function can also write cached +// points if FlushBufferWhenFull is true. +func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { if ro.Config.Filter.IsActive { - if !ro.Config.Filter.ShouldMetricPass(point) { + if !ro.Config.Filter.ShouldMetricPass(metric) { return } } - if len(ro.metrics) < ro.PointBufferLimit { - ro.metrics = append(ro.metrics, point) + if len(ro.metrics) < ro.MetricBufferLimit { + ro.Lock() + ro.metrics = append(ro.metrics, metric) + ro.Unlock() } else { - log.Printf("WARNING: overwriting cached metrics, you may want to " + - "increase the metric_buffer_limit setting in your [agent] config " + - "if you do not wish to overwrite metrics.\n") - if ro.overwriteCounter == len(ro.metrics) { - ro.overwriteCounter = 0 + if ro.FlushBufferWhenFull { + ro.Lock() + tmpmetrics := make([]telegraf.Metric, len(ro.metrics)) + copy(tmpmetrics, ro.metrics) + ro.metrics = make([]telegraf.Metric, 0) + ro.Unlock() + err := ro.write(tmpmetrics) + if err != nil { + log.Printf("ERROR writing full metric buffer to output %s, %s", + ro.Name, err) + if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT { + ro.mapI = 0 + // overwrite one + ro.tmpmetrics[ro.mapI] = tmpmetrics + ro.mapI++ + } else { + ro.tmpmetrics[ro.mapI] = tmpmetrics + ro.mapI++ + } + } + } else { + log.Printf("WARNING: overwriting cached metrics, you may want to " + + "increase the metric_buffer_limit setting in your [agent] " + + "config if you do not wish to overwrite metrics.\n") + ro.Lock() + if ro.overwriteI == len(ro.metrics) { + ro.overwriteI = 0 + } + ro.metrics[ro.overwriteI] = metric + ro.overwriteI++ + ro.Unlock() } - ro.metrics[ro.overwriteCounter] = point - ro.overwriteCounter++ } } +// Write writes all cached points to this output. func (ro *RunningOutput) Write() error { + ro.Lock() + err := ro.write(ro.metrics) + if err != nil { + return err + } else { + ro.metrics = make([]telegraf.Metric, 0) + ro.overwriteI = 0 + } + ro.Unlock() + + // Write any cached metric buffers that failed previously + for i, tmpmetrics := range ro.tmpmetrics { + if err := ro.write(tmpmetrics); err != nil { + return err + } else { + delete(ro.tmpmetrics, i) + } + } + + return nil +} + +func (ro *RunningOutput) write(metrics []telegraf.Metric) error { start := time.Now() - err := ro.Output.Write(ro.metrics) + err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { if !ro.Quiet { log.Printf("Wrote %d metrics to output %s in %s\n", - len(ro.metrics), ro.Name, elapsed) + len(metrics), ro.Name, elapsed) } - ro.metrics = make([]telegraf.Metric, 0) - ro.overwriteCounter = 0 } return err } diff --git a/plugins/inputs/github_webhooks/github_webhooks.go b/plugins/inputs/github_webhooks/github_webhooks.go index a66563addb292..6dc97f5a332b0 100644 --- a/plugins/inputs/github_webhooks/github_webhooks.go +++ b/plugins/inputs/github_webhooks/github_webhooks.go @@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() { } } -func (gh *GithubWebhooks) Start() error { +func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error { go gh.Listen() log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress) return nil diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 9fa47dee9bf1e..66fce3fcf8c89 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -1,7 +1,6 @@ package kafka_consumer import ( - "fmt" "log" "strings" "sync" @@ -19,11 +18,13 @@ type Kafka struct { Topics []string ZookeeperPeers []string Consumer *consumergroup.ConsumerGroup - MetricBuffer int + + // Legacy metric buffer support + MetricBuffer int // TODO remove PointBuffer, legacy support PointBuffer int - Offset string + Offset string parser parsers.Parser sync.Mutex @@ -32,9 +33,10 @@ type Kafka struct { in <-chan *sarama.ConsumerMessage // channel for all kafka consumer errors errs <-chan *sarama.ConsumerError - // channel for all incoming parsed kafka metrics - metricC chan telegraf.Metric - done chan struct{} + done chan struct{} + + // keep the accumulator internally: + acc telegraf.Accumulator // doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer // this is mostly for test purposes, but there may be a use-case for it later. @@ -48,8 +50,6 @@ var sampleConfig = ` zookeeper_peers = ["localhost:2181"] ### the name of the consumer group consumer_group = "telegraf_metrics_consumers" - ### Maximum number of metrics to buffer between collection intervals - metric_buffer = 100000 ### Offset (must be either "oldest" or "newest") offset = "oldest" @@ -72,11 +72,13 @@ func (k *Kafka) SetParser(parser parsers.Parser) { k.parser = parser } -func (k *Kafka) Start() error { +func (k *Kafka) Start(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() var consumerErr error + k.acc = acc + config := consumergroup.NewConfig() switch strings.ToLower(k.Offset) { case "oldest", "": @@ -106,13 +108,6 @@ func (k *Kafka) Start() error { } k.done = make(chan struct{}) - if k.PointBuffer == 0 && k.MetricBuffer == 0 { - k.MetricBuffer = 100000 - } else if k.PointBuffer > 0 { - // Legacy support of PointBuffer field TODO remove - k.MetricBuffer = k.PointBuffer - } - k.metricC = make(chan telegraf.Metric, k.MetricBuffer) // Start the kafka message reader go k.receiver() @@ -138,14 +133,7 @@ func (k *Kafka) receiver() { } for _, metric := range metrics { - fmt.Println(string(metric.Name())) - select { - case k.metricC <- metric: - continue - default: - log.Printf("Kafka Consumer buffer is full, dropping a metric." + - " You may want to increase the metric_buffer setting") - } + k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } if !k.doNotCommitMsgs { @@ -169,13 +157,6 @@ func (k *Kafka) Stop() { } func (k *Kafka) Gather(acc telegraf.Accumulator) error { - k.Lock() - defer k.Unlock() - nmetrics := len(k.metricC) - for i := 0; i < nmetrics; i++ { - metric := <-k.metricC - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) - } return nil } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index 458d43d355e26..e823f49a55c38 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -44,18 +44,19 @@ func TestReadsMetricsFromKafka(t *testing.T) { } p, _ := parsers.NewInfluxParser() k.SetParser(p) - if err := k.Start(); err != nil { - t.Fatal(err.Error()) - } else { - defer k.Stop() - } - - waitForPoint(k, t) // Verify that we can now gather the sent message var acc testutil.Accumulator + // Sanity check assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") + if err := k.Start(&acc); err != nil { + t.Fatal(err.Error()) + } else { + defer k.Stop() + } + + waitForPoint(&acc, t) // Gather points err = k.Gather(&acc) @@ -77,7 +78,7 @@ func TestReadsMetricsFromKafka(t *testing.T) { // Waits for the metric that was sent to the kafka broker to arrive at the kafka // consumer -func waitForPoint(k *Kafka, t *testing.T) { +func waitForPoint(acc *testutil.Accumulator, t *testing.T) { // Give the kafka container up to 2 seconds to get the point to the consumer ticker := time.NewTicker(5 * time.Millisecond) counter := 0 @@ -87,7 +88,7 @@ func waitForPoint(k *Kafka, t *testing.T) { counter++ if counter > 1000 { t.Fatal("Waited for 5s, point never arrived to consumer") - } else if len(k.metricC) == 1 { + } else if acc.NFields() == 1 { return } } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index ec69cb926b6e0..e631f6708cdef 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -17,29 +16,28 @@ const ( testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" invalidMsg = "cpu_load_short,host=server01 1422568543702900257" - pointBuffer = 5 ) -func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { - in := make(chan *sarama.ConsumerMessage, pointBuffer) +func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { + in := make(chan *sarama.ConsumerMessage, 1000) k := Kafka{ ConsumerGroup: "test", Topics: []string{"telegraf"}, ZookeeperPeers: []string{"localhost:2181"}, - PointBuffer: pointBuffer, Offset: "oldest", in: in, doNotCommitMsgs: true, - errs: make(chan *sarama.ConsumerError, pointBuffer), + errs: make(chan *sarama.ConsumerError, 1000), done: make(chan struct{}), - metricC: make(chan telegraf.Metric, pointBuffer), } return &k, in } // Test that the parser parses kafka messages into points func TestRunParser(t *testing.T) { - k, in := NewTestKafka() + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc defer close(k.done) k.parser, _ = parsers.NewInfluxParser() @@ -47,12 +45,14 @@ func TestRunParser(t *testing.T) { in <- saramaMsg(testMsg) time.Sleep(time.Millisecond) - assert.Equal(t, len(k.metricC), 1) + assert.Equal(t, acc.NFields(), 1) } // Test that the parser ignores invalid messages func TestRunParserInvalidMsg(t *testing.T) { - k, in := NewTestKafka() + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc defer close(k.done) k.parser, _ = parsers.NewInfluxParser() @@ -60,27 +60,14 @@ func TestRunParserInvalidMsg(t *testing.T) { in <- saramaMsg(invalidMsg) time.Sleep(time.Millisecond) - assert.Equal(t, len(k.metricC), 0) -} - -// Test that points are dropped when we hit the buffer limit -func TestRunParserRespectsBuffer(t *testing.T) { - k, in := NewTestKafka() - defer close(k.done) - - k.parser, _ = parsers.NewInfluxParser() - go k.receiver() - for i := 0; i < pointBuffer+1; i++ { - in <- saramaMsg(testMsg) - } - time.Sleep(time.Millisecond) - - assert.Equal(t, len(k.metricC), 5) + assert.Equal(t, acc.NFields(), 0) } // Test that the parser parses kafka messages into points func TestRunParserAndGather(t *testing.T) { - k, in := NewTestKafka() + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc defer close(k.done) k.parser, _ = parsers.NewInfluxParser() @@ -88,17 +75,18 @@ func TestRunParserAndGather(t *testing.T) { in <- saramaMsg(testMsg) time.Sleep(time.Millisecond) - acc := testutil.Accumulator{} k.Gather(&acc) - assert.Equal(t, len(acc.Metrics), 1) + assert.Equal(t, acc.NFields(), 1) acc.AssertContainsFields(t, "cpu_load_short", map[string]interface{}{"value": float64(23422)}) } // Test that the parser parses kafka messages into points func TestRunParserAndGatherGraphite(t *testing.T) { - k, in := NewTestKafka() + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc defer close(k.done) k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) @@ -106,17 +94,18 @@ func TestRunParserAndGatherGraphite(t *testing.T) { in <- saramaMsg(testMsgGraphite) time.Sleep(time.Millisecond) - acc := testutil.Accumulator{} k.Gather(&acc) - assert.Equal(t, len(acc.Metrics), 1) + assert.Equal(t, acc.NFields(), 1) acc.AssertContainsFields(t, "cpu_load_short_graphite", map[string]interface{}{"value": float64(23422)}) } // Test that the parser parses kafka messages into points func TestRunParserAndGatherJSON(t *testing.T) { - k, in := NewTestKafka() + k, in := newTestKafka() + acc := testutil.Accumulator{} + k.acc = &acc defer close(k.done) k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) @@ -124,10 +113,9 @@ func TestRunParserAndGatherJSON(t *testing.T) { in <- saramaMsg(testMsgJSON) time.Sleep(time.Millisecond) - acc := testutil.Accumulator{} k.Gather(&acc) - assert.Equal(t, len(acc.Metrics), 1) + assert.Equal(t, acc.NFields(), 2) acc.AssertContainsFields(t, "kafka_json_test", map[string]interface{}{ "a": float64(5), diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 8ca0d44b1a281..ac4b738d7e71d 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -15,15 +15,17 @@ import ( ) type MQTTConsumer struct { - Servers []string - Topics []string - Username string - Password string - MetricBuffer int - QoS int `toml:"qos"` + Servers []string + Topics []string + Username string + Password string + QoS int `toml:"qos"` parser parsers.Parser + // Legacy metric buffer support + MetricBuffer int + // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to host cert file @@ -35,13 +37,12 @@ type MQTTConsumer struct { sync.Mutex client *mqtt.Client - // channel for all incoming parsed mqtt metrics - metricC chan telegraf.Metric - // channel for the topics of all incoming metrics (for tagging metrics) - topicC chan string // channel of all incoming raw mqtt messages in chan mqtt.Message done chan struct{} + + // keep the accumulator internally: + acc telegraf.Accumulator } var sampleConfig = ` @@ -56,9 +57,6 @@ var sampleConfig = ` "sensors/#", ] - ### Maximum number of metrics to buffer between collection intervals - metric_buffer = 100000 - ### username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" @@ -89,9 +87,11 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) { m.parser = parser } -func (m *MQTTConsumer) Start() error { +func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.Lock() defer m.Unlock() + + m.acc = acc if m.QoS > 2 || m.QoS < 0 { return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) } @@ -106,13 +106,8 @@ func (m *MQTTConsumer) Start() error { return token.Error() } - m.in = make(chan mqtt.Message, m.MetricBuffer) + m.in = make(chan mqtt.Message, 1000) m.done = make(chan struct{}) - if m.MetricBuffer == 0 { - m.MetricBuffer = 100000 - } - m.metricC = make(chan telegraf.Metric, m.MetricBuffer) - m.topicC = make(chan string, m.MetricBuffer) topics := make(map[string]byte) for _, topic := range m.Topics { @@ -145,13 +140,9 @@ func (m *MQTTConsumer) receiver() { } for _, metric := range metrics { - select { - case m.metricC <- metric: - m.topicC <- topic - default: - log.Printf("MQTT Consumer buffer is full, dropping a metric." + - " You may want to increase the metric_buffer setting") - } + tags := metric.Tags() + tags["topic"] = topic + m.acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) } } } @@ -169,16 +160,6 @@ func (m *MQTTConsumer) Stop() { } func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { - m.Lock() - defer m.Unlock() - nmetrics := len(m.metricC) - for i := 0; i < nmetrics; i++ { - metric := <-m.metricC - topic := <-m.topicC - tags := metric.Tags() - tags["topic"] = topic - acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) - } return nil } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index be216dfbbbbd6..9e20e9a8b308d 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -16,19 +15,15 @@ const ( testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" invalidMsg = "cpu_load_short,host=server01 1422568543702900257" - metricBuffer = 5 ) func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) { - in := make(chan mqtt.Message, metricBuffer) + in := make(chan mqtt.Message, 100) n := &MQTTConsumer{ - Topics: []string{"telegraf"}, - Servers: []string{"localhost:1883"}, - MetricBuffer: metricBuffer, - in: in, - done: make(chan struct{}), - metricC: make(chan telegraf.Metric, metricBuffer), - topicC: make(chan string, metricBuffer), + Topics: []string{"telegraf"}, + Servers: []string{"localhost:1883"}, + in: in, + done: make(chan struct{}), } return n, in } @@ -36,14 +31,16 @@ func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) { // Test that the parser parses NATS messages into metrics func TestRunParser(t *testing.T) { n, in := newTestMQTTConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- mqttMsg(testMsg) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - if a := len(n.metricC); a != 1 { + if a := acc.NFields(); a != 1 { t.Errorf("got %v, expected %v", a, 1) } } @@ -51,46 +48,32 @@ func TestRunParser(t *testing.T) { // Test that the parser ignores invalid messages func TestRunParserInvalidMsg(t *testing.T) { n, in := newTestMQTTConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- mqttMsg(invalidMsg) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - if a := len(n.metricC); a != 0 { + if a := acc.NFields(); a != 0 { t.Errorf("got %v, expected %v", a, 0) } } -// Test that metrics are dropped when we hit the buffer limit -func TestRunParserRespectsBuffer(t *testing.T) { - n, in := newTestMQTTConsumer() - defer close(n.done) - - n.parser, _ = parsers.NewInfluxParser() - go n.receiver() - for i := 0; i < metricBuffer+1; i++ { - in <- mqttMsg(testMsg) - } - time.Sleep(time.Millisecond) - - if a := len(n.metricC); a != metricBuffer { - t.Errorf("got %v, expected %v", a, metricBuffer) - } -} - // Test that the parser parses line format messages into metrics func TestRunParserAndGather(t *testing.T) { n, in := newTestMQTTConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- mqttMsg(testMsg) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - acc := testutil.Accumulator{} n.Gather(&acc) if a := len(acc.Metrics); a != 1 { @@ -103,14 +86,15 @@ func TestRunParserAndGather(t *testing.T) { // Test that the parser parses graphite format messages into metrics func TestRunParserAndGatherGraphite(t *testing.T) { n, in := newTestMQTTConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) go n.receiver() in <- mqttMsg(testMsgGraphite) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - acc := testutil.Accumulator{} n.Gather(&acc) if a := len(acc.Metrics); a != 1 { @@ -123,14 +107,15 @@ func TestRunParserAndGatherGraphite(t *testing.T) { // Test that the parser parses json format messages into metrics func TestRunParserAndGatherJSON(t *testing.T) { n, in := newTestMQTTConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) go n.receiver() in <- mqttMsg(testMsgJSON) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - acc := testutil.Accumulator{} n.Gather(&acc) if a := len(acc.Metrics); a != 1 { diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 56d56990f27a8..7dad47b46529b 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -28,8 +28,10 @@ type natsConsumer struct { Servers []string Secure bool + // Legacy metric buffer support MetricBuffer int - parser parsers.Parser + + parser parsers.Parser sync.Mutex Conn *nats.Conn @@ -39,9 +41,8 @@ type natsConsumer struct { in chan *nats.Msg // channel for all NATS read errors errs chan error - // channel for all incoming parsed metrics - metricC chan telegraf.Metric - done chan struct{} + done chan struct{} + acc telegraf.Accumulator } var sampleConfig = ` @@ -53,9 +54,7 @@ var sampleConfig = ` subjects = ["telegraf"] ### name a queue group queue_group = "telegraf_consumers" - ### Maximum number of metrics to buffer between collection intervals - metric_buffer = 100000 - + ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read ### more about them here: @@ -84,10 +83,12 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro } // Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up. -func (n *natsConsumer) Start() error { +func (n *natsConsumer) Start(acc telegraf.Accumulator) error { n.Lock() defer n.Unlock() + n.acc = acc + var connectErr error opts := nats.DefaultOptions @@ -115,11 +116,6 @@ func (n *natsConsumer) Start() error { } n.done = make(chan struct{}) - if n.MetricBuffer == 0 { - n.MetricBuffer = 100000 - } - - n.metricC = make(chan telegraf.Metric, n.MetricBuffer) // Start the message reader go n.receiver() @@ -146,13 +142,7 @@ func (n *natsConsumer) receiver() { } for _, metric := range metrics { - select { - case n.metricC <- metric: - continue - default: - log.Printf("NATS Consumer buffer is full, dropping a metric." + - " You may want to increase the metric_buffer setting") - } + n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } } @@ -163,7 +153,6 @@ func (n *natsConsumer) clean() { n.Lock() defer n.Unlock() close(n.in) - close(n.metricC) close(n.errs) for _, sub := range n.Subs { @@ -185,13 +174,6 @@ func (n *natsConsumer) Stop() { } func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { - n.Lock() - defer n.Unlock() - nmetrics := len(n.metricC) - for i := 0; i < nmetrics; i++ { - metric := <-n.metricC - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) - } return nil } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 214695d91a672..7141101b21f49 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/nats-io/nats" @@ -21,15 +20,13 @@ const ( func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { in := make(chan *nats.Msg, metricBuffer) n := &natsConsumer{ - QueueGroup: "test", - Subjects: []string{"telegraf"}, - Servers: []string{"nats://localhost:4222"}, - Secure: false, - MetricBuffer: metricBuffer, - in: in, - errs: make(chan error, metricBuffer), - done: make(chan struct{}), - metricC: make(chan telegraf.Metric, metricBuffer), + QueueGroup: "test", + Subjects: []string{"telegraf"}, + Servers: []string{"nats://localhost:4222"}, + Secure: false, + in: in, + errs: make(chan error, metricBuffer), + done: make(chan struct{}), } return n, in } @@ -37,61 +34,49 @@ func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { // Test that the parser parses NATS messages into metrics func TestRunParser(t *testing.T) { n, in := newTestNatsConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- natsMsg(testMsg) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - if a := len(n.metricC); a != 1 { - t.Errorf("got %v, expected %v", a, 1) + if acc.NFields() != 1 { + t.Errorf("got %v, expected %v", acc.NFields(), 1) } } // Test that the parser ignores invalid messages func TestRunParserInvalidMsg(t *testing.T) { n, in := newTestNatsConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- natsMsg(invalidMsg) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - if a := len(n.metricC); a != 0 { - t.Errorf("got %v, expected %v", a, 0) - } -} - -// Test that metrics are dropped when we hit the buffer limit -func TestRunParserRespectsBuffer(t *testing.T) { - n, in := newTestNatsConsumer() - defer close(n.done) - - n.parser, _ = parsers.NewInfluxParser() - go n.receiver() - for i := 0; i < metricBuffer+1; i++ { - in <- natsMsg(testMsg) - } - time.Sleep(time.Millisecond) - - if a := len(n.metricC); a != metricBuffer { - t.Errorf("got %v, expected %v", a, metricBuffer) + if acc.NFields() != 0 { + t.Errorf("got %v, expected %v", acc.NFields(), 0) } } // Test that the parser parses line format messages into metrics func TestRunParserAndGather(t *testing.T) { n, in := newTestNatsConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- natsMsg(testMsg) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - acc := testutil.Accumulator{} n.Gather(&acc) if a := len(acc.Metrics); a != 1 { @@ -104,14 +89,15 @@ func TestRunParserAndGather(t *testing.T) { // Test that the parser parses graphite format messages into metrics func TestRunParserAndGatherGraphite(t *testing.T) { n, in := newTestNatsConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) go n.receiver() in <- natsMsg(testMsgGraphite) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - acc := testutil.Accumulator{} n.Gather(&acc) if a := len(acc.Metrics); a != 1 { @@ -124,14 +110,15 @@ func TestRunParserAndGatherGraphite(t *testing.T) { // Test that the parser parses json format messages into metrics func TestRunParserAndGatherJSON(t *testing.T) { n, in := newTestNatsConsumer() + acc := testutil.Accumulator{} + n.acc = &acc defer close(n.done) n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) go n.receiver() in <- natsMsg(testMsgJSON) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 25) - acc := testutil.Accumulator{} n.Gather(&acc) if a := len(acc.Metrics); a != 1 { diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index fb8de402e4185..470e318848cc4 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -213,7 +213,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { return nil } -func (s *Statsd) Start() error { +func (s *Statsd) Start(_ telegraf.Accumulator) error { // Make data structures s.done = make(chan struct{}) s.in = make(chan []byte, s.AllowedPendingMessages) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 7101db091da6c..cb56d8d289528 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -108,6 +108,8 @@ func (a *Accumulator) Get(measurement string) (*Metric, bool) { // NFields returns the total number of fields in the accumulator, across all // measurements func (a *Accumulator) NFields() int { + a.Lock() + defer a.Unlock() counter := 0 for _, pt := range a.Metrics { for _, _ = range pt.Fields { @@ -123,6 +125,8 @@ func (a *Accumulator) AssertContainsTaggedFields( fields map[string]interface{}, tags map[string]string, ) { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if !reflect.DeepEqual(tags, p.Tags) { continue @@ -148,6 +152,8 @@ func (a *Accumulator) AssertContainsFields( measurement string, fields map[string]interface{}, ) { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { if !reflect.DeepEqual(fields, p.Fields) { @@ -166,6 +172,8 @@ func (a *Accumulator) AssertContainsFields( // HasIntValue returns true if the measurement has an Int value func (a *Accumulator) HasIntField(measurement string, field string) bool { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { for fieldname, value := range p.Fields { @@ -182,6 +190,8 @@ func (a *Accumulator) HasIntField(measurement string, field string) bool { // HasUIntValue returns true if the measurement has a UInt value func (a *Accumulator) HasUIntField(measurement string, field string) bool { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { for fieldname, value := range p.Fields { @@ -198,6 +208,8 @@ func (a *Accumulator) HasUIntField(measurement string, field string) bool { // HasFloatValue returns true if the given measurement has a float value func (a *Accumulator) HasFloatField(measurement string, field string) bool { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { for fieldname, value := range p.Fields { @@ -215,6 +227,8 @@ func (a *Accumulator) HasFloatField(measurement string, field string) bool { // HasMeasurement returns true if the accumulator has a measurement with the // given name func (a *Accumulator) HasMeasurement(measurement string) bool { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { return true