diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index dac80476d28ca..47fa645169785 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -1,6 +1,7 @@ package natsconsumer import ( + "context" "fmt" "log" "sync" @@ -11,6 +12,13 @@ import ( nats "github.com/nats-io/go-nats" ) +var ( + defaultMaxMessagesInFlight = 1000 +) + +type empty struct{} +type semaphore chan empty + type natsError struct { conn *nats.Conn sub *nats.Subscription @@ -32,22 +40,22 @@ type natsConsumer struct { PendingMessageLimit int PendingBytesLimit int + MaxMessagesInFlight int `toml:"max_messages_in_flight"` + // Legacy metric buffer support; deprecated in v0.10.3 MetricBuffer int - parser parsers.Parser - - sync.Mutex - wg sync.WaitGroup - Conn *nats.Conn - Subs []*nats.Subscription + conn *nats.Conn + subs []*nats.Subscription + parser parsers.Parser // channel for all incoming NATS messages in chan *nats.Msg // channel for all NATS read errors - errs chan error - done chan struct{} - acc telegraf.Accumulator + errs chan error + acc telegraf.TrackingAccumulator + wg sync.WaitGroup + cancel context.CancelFunc } var sampleConfig = ` @@ -65,6 +73,10 @@ var sampleConfig = ` # pending_message_limit = 65536 # pending_bytes_limit = 67108864 + ## Max messages to read from the server that have not been written by an + ## output. + # max_messages_in_flight = 1000 + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -94,10 +106,7 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro // Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up. func (n *natsConsumer) Start(acc telegraf.Accumulator) error { - n.Lock() - defer n.Unlock() - - n.acc = acc + n.acc = acc.WithTracking(n.MaxMessagesInFlight) var connectErr error @@ -112,89 +121,106 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { opts.Secure = n.Secure - if n.Conn == nil || n.Conn.IsClosed() { - n.Conn, connectErr = opts.Connect() + if n.conn == nil || n.conn.IsClosed() { + n.conn, connectErr = opts.Connect() if connectErr != nil { return connectErr } // Setup message and error channels n.errs = make(chan error) - n.Conn.SetErrorHandler(n.natsErrHandler) + n.conn.SetErrorHandler(n.natsErrHandler) n.in = make(chan *nats.Msg, 1000) for _, subj := range n.Subjects { - sub, err := n.Conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) { + sub, err := n.conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) { n.in <- m }) if err != nil { return err } // ensure that the subscription has been processed by the server - if err = n.Conn.Flush(); err != nil { + if err = n.conn.Flush(); err != nil { return err } // set the subscription pending limits if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil { return err } - n.Subs = append(n.Subs, sub) + n.subs = append(n.subs, sub) } } - n.done = make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + n.cancel = cancel // Start the message reader n.wg.Add(1) - go n.receiver() + go func() { + defer n.wg.Done() + go n.receiver(ctx) + }() + log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n", - n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup) + n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup) return nil } // receiver() reads all incoming messages from NATS, and parses them into // telegraf metrics. -func (n *natsConsumer) receiver() { - defer n.wg.Done() +func (n *natsConsumer) receiver(ctx context.Context) { + sem := make(semaphore, n.MaxMessagesInFlight) + for { select { - case <-n.done: + case <-ctx.Done(): return + case <-n.acc.Delivered(): + <-sem case err := <-n.errs: - n.acc.AddError(fmt.Errorf("E! error reading from %s\n", err.Error())) - case msg := <-n.in: - metrics, err := n.parser.Parse(msg.Data) - if err != nil { - n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error())) - } - - for _, metric := range metrics { - n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + n.acc.AddError(err) + case sem <- empty{}: + select { + case <-ctx.Done(): + return + case err := <-n.errs: + <-sem + n.acc.AddError(err) + case <-n.acc.Delivered(): + <-sem + <-sem + case msg := <-n.in: + metrics, err := n.parser.Parse(msg.Data) + if err != nil { + n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error())) + <-sem + continue + } + + n.acc.AddTrackingMetricGroup(metrics) } } } } func (n *natsConsumer) clean() { - for _, sub := range n.Subs { + for _, sub := range n.subs { if err := sub.Unsubscribe(); err != nil { n.acc.AddError(fmt.Errorf("E! Error unsubscribing from subject %s in queue %s: %s\n", sub.Subject, sub.Queue, err.Error())) } } - if n.Conn != nil && !n.Conn.IsClosed() { - n.Conn.Close() + if n.conn != nil && !n.conn.IsClosed() { + n.conn.Close() } } func (n *natsConsumer) Stop() { - n.Lock() - close(n.done) + n.cancel() n.wg.Wait() n.clean() - n.Unlock() } func (n *natsConsumer) Gather(acc telegraf.Accumulator) error { @@ -210,6 +236,7 @@ func init() { QueueGroup: "telegraf_consumers", PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, + MaxMessagesInFlight: defaultMaxMessagesInFlight, } }) } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go deleted file mode 100644 index a1f499554c392..0000000000000 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ /dev/null @@ -1,134 +0,0 @@ -package natsconsumer - -import ( - "testing" - - "github.com/influxdata/telegraf/plugins/parsers" - "github.com/influxdata/telegraf/testutil" - nats "github.com/nats-io/go-nats" - "github.com/stretchr/testify/assert" -) - -const ( - testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n" - testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" - testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" - invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n" - metricBuffer = 5 -) - -func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) { - in := make(chan *nats.Msg, metricBuffer) - n := &natsConsumer{ - QueueGroup: "test", - Subjects: []string{"telegraf"}, - Servers: []string{"nats://localhost:4222"}, - Secure: false, - in: in, - errs: make(chan error, metricBuffer), - done: make(chan struct{}), - } - return n, in -} - -// Test that the parser parses NATS messages into metrics -func TestRunParser(t *testing.T) { - n, in := newTestNatsConsumer() - acc := testutil.Accumulator{} - n.acc = &acc - defer close(n.done) - - n.parser, _ = parsers.NewInfluxParser() - n.wg.Add(1) - go n.receiver() - in <- natsMsg(testMsg) - - acc.Wait(1) -} - -// Test that the parser ignores invalid messages -func TestRunParserInvalidMsg(t *testing.T) { - n, in := newTestNatsConsumer() - acc := testutil.Accumulator{} - n.acc = &acc - defer close(n.done) - - n.parser, _ = parsers.NewInfluxParser() - n.wg.Add(1) - go n.receiver() - in <- natsMsg(invalidMsg) - - acc.WaitError(1) - assert.Contains(t, acc.Errors[0].Error(), "E! subject: telegraf, error: metric parse error") - assert.EqualValues(t, 0, acc.NMetrics()) -} - -// Test that the parser parses line format messages into metrics -func TestRunParserAndGather(t *testing.T) { - n, in := newTestNatsConsumer() - acc := testutil.Accumulator{} - n.acc = &acc - defer close(n.done) - - n.parser, _ = parsers.NewInfluxParser() - n.wg.Add(1) - go n.receiver() - in <- natsMsg(testMsg) - - n.Gather(&acc) - - acc.Wait(1) - acc.AssertContainsFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(23422)}) -} - -// Test that the parser parses graphite format messages into metrics -func TestRunParserAndGatherGraphite(t *testing.T) { - n, in := newTestNatsConsumer() - acc := testutil.Accumulator{} - n.acc = &acc - defer close(n.done) - - n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) - n.wg.Add(1) - go n.receiver() - in <- natsMsg(testMsgGraphite) - - n.Gather(&acc) - - acc.Wait(1) - acc.AssertContainsFields(t, "cpu_load_short_graphite", - map[string]interface{}{"value": float64(23422)}) -} - -// Test that the parser parses json format messages into metrics -func TestRunParserAndGatherJSON(t *testing.T) { - n, in := newTestNatsConsumer() - acc := testutil.Accumulator{} - n.acc = &acc - defer close(n.done) - - n.parser, _ = parsers.NewParser(&parsers.Config{ - DataFormat: "json", - MetricName: "nats_json_test", - }) - n.wg.Add(1) - go n.receiver() - in <- natsMsg(testMsgJSON) - - n.Gather(&acc) - - acc.Wait(1) - acc.AssertContainsFields(t, "nats_json_test", - map[string]interface{}{ - "a": float64(5), - "b_c": float64(6), - }) -} - -func natsMsg(val string) *nats.Msg { - return &nats.Msg{ - Subject: "telegraf", - Data: []byte(val), - } -}