Skip to content

Commit

Permalink
Add tracking limiting to nats_consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson committed Nov 1, 2018
1 parent 60c98c8 commit 0d8c445
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 174 deletions.
107 changes: 67 additions & 40 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package natsconsumer

import (
"context"
"fmt"
"log"
"sync"
Expand All @@ -11,6 +12,13 @@ import (
nats "github.com/nats-io/go-nats"
)

var (
defaultMaxMessagesInFlight = 1000
)

type empty struct{}
type semaphore chan empty

type natsError struct {
conn *nats.Conn
sub *nats.Subscription
Expand All @@ -32,22 +40,22 @@ type natsConsumer struct {
PendingMessageLimit int
PendingBytesLimit int

MaxMessagesInFlight int `toml:"max_messages_in_flight"`

// Legacy metric buffer support; deprecated in v0.10.3
MetricBuffer int

parser parsers.Parser

sync.Mutex
wg sync.WaitGroup
Conn *nats.Conn
Subs []*nats.Subscription
conn *nats.Conn
subs []*nats.Subscription

parser parsers.Parser
// channel for all incoming NATS messages
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
done chan struct{}
acc telegraf.Accumulator
errs chan error
acc telegraf.TrackingAccumulator
wg sync.WaitGroup
cancel context.CancelFunc
}

var sampleConfig = `
Expand All @@ -65,6 +73,10 @@ var sampleConfig = `
# pending_message_limit = 65536
# pending_bytes_limit = 67108864
## Max messages to read from the server that have not been written by an
## output.
# max_messages_in_flight = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down Expand Up @@ -94,10 +106,7 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro

// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()

n.acc = acc
n.acc = acc.WithTracking(n.MaxMessagesInFlight)

var connectErr error

Expand All @@ -112,89 +121,106 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {

opts.Secure = n.Secure

if n.Conn == nil || n.Conn.IsClosed() {
n.Conn, connectErr = opts.Connect()
if n.conn == nil || n.conn.IsClosed() {
n.conn, connectErr = opts.Connect()
if connectErr != nil {
return connectErr
}

// Setup message and error channels
n.errs = make(chan error)
n.Conn.SetErrorHandler(n.natsErrHandler)
n.conn.SetErrorHandler(n.natsErrHandler)

n.in = make(chan *nats.Msg, 1000)
for _, subj := range n.Subjects {
sub, err := n.Conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) {
sub, err := n.conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}
// ensure that the subscription has been processed by the server
if err = n.Conn.Flush(); err != nil {
if err = n.conn.Flush(); err != nil {
return err
}
// set the subscription pending limits
if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil {
return err
}
n.Subs = append(n.Subs, sub)
n.subs = append(n.subs, sub)
}
}

n.done = make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
n.cancel = cancel

// Start the message reader
n.wg.Add(1)
go n.receiver()
go func() {
defer n.wg.Done()
go n.receiver(ctx)
}()

log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup)

return nil
}

// receiver() reads all incoming messages from NATS, and parses them into
// telegraf metrics.
func (n *natsConsumer) receiver() {
defer n.wg.Done()
func (n *natsConsumer) receiver(ctx context.Context) {
sem := make(semaphore, n.MaxMessagesInFlight)

for {
select {
case <-n.done:
case <-ctx.Done():
return
case <-n.acc.Delivered():
<-sem
case err := <-n.errs:
n.acc.AddError(fmt.Errorf("E! error reading from %s\n", err.Error()))
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error()))
}

for _, metric := range metrics {
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
n.acc.AddError(err)
case sem <- empty{}:
select {
case <-ctx.Done():
return
case err := <-n.errs:
<-sem
n.acc.AddError(err)
case <-n.acc.Delivered():
<-sem
<-sem
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error()))
<-sem
continue
}

n.acc.AddTrackingMetricGroup(metrics)
}
}
}
}

func (n *natsConsumer) clean() {
for _, sub := range n.Subs {
for _, sub := range n.subs {
if err := sub.Unsubscribe(); err != nil {
n.acc.AddError(fmt.Errorf("E! Error unsubscribing from subject %s in queue %s: %s\n",
sub.Subject, sub.Queue, err.Error()))
}
}

if n.Conn != nil && !n.Conn.IsClosed() {
n.Conn.Close()
if n.conn != nil && !n.conn.IsClosed() {
n.conn.Close()
}
}

func (n *natsConsumer) Stop() {
n.Lock()
close(n.done)
n.cancel()
n.wg.Wait()
n.clean()
n.Unlock()
}

func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
Expand All @@ -210,6 +236,7 @@ func init() {
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxMessagesInFlight: defaultMaxMessagesInFlight,
}
})
}
134 changes: 0 additions & 134 deletions plugins/inputs/nats_consumer/nats_consumer_test.go

This file was deleted.

0 comments on commit 0d8c445

Please sign in to comment.