Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Nats Jetstream consumer #11046 add simple support for jetstream subjects #11373

Merged
merged 15 commits into from
Jul 18, 2022
12 changes: 12 additions & 0 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ instances of telegraf can read from a NATS cluster in parallel.
## subject(s) to consume
subjects = ["telegraf"]

## jetstream subjects
jetstream_subjects = ["js_telegraf"]
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## name a queue group
queue_group = "telegraf_consumers"

Expand Down Expand Up @@ -62,3 +65,12 @@ instances of telegraf can read from a NATS cluster in parallel.
[nats]: https://www.nats.io/about/
[input data formats]: /docs/DATA_FORMATS_INPUT.md
[queue group]: https://www.nats.io/documentation/concepts/nats-queueing/

## Metrics

Which data you will get depends on the subjects you consume from nats

## Example Output

Depends on the nats subject input
nats_consumer,host=[] value=1.9 1655972309339341000
47 changes: 42 additions & 5 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type natsConsumer struct {
Username string `toml:"username"`
Password string `toml:"password"`
Credentials string `toml:"credentials"`
JsSubjects []string `toml:"jetstream_subjects"`

tls.ClientConfig

Expand All @@ -58,8 +59,10 @@ type natsConsumer struct {
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
MetricBuffer int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"`

conn *nats.Conn
subs []*nats.Subscription
conn *nats.Conn
jsConn nats.JetStreamContext
subs []*nats.Subscription
jsSubs []*nats.Subscription

parser parsers.Parser
// channel for all incoming NATS messages
Expand Down Expand Up @@ -142,6 +145,33 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {

n.subs = append(n.subs, sub)
}

if len(n.JsSubjects) > 0 {
var connErr error
n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256))
if connErr != nil {
return connErr
}

if n.jsConn == nil {
srebhan marked this conversation as resolved.
Show resolved Hide resolved
for _, jsSub := range n.JsSubjects {
sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}

// set the subscription pending limits
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
if err != nil {
return err
}

n.jsSubs = append(n.jsSubs, sub)
}
}
}
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -154,8 +184,8 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
go n.receiver(ctx)
}()

n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, jssubjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.JsSubjects, n.QueueGroup)

return nil
}
Expand Down Expand Up @@ -201,7 +231,14 @@ func (n *natsConsumer) clean() {
for _, sub := range n.subs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err.Error())
sub.Subject, sub.Queue, err)
}
}

for _, sub := range n.jsSubs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
}
}

Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/nats_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
## subject(s) to consume
subjects = ["telegraf"]

## jetstream subjects
jetstream_subjects = ["js_telegraf"]

## name a queue group
queue_group = "telegraf_consumers"

Expand Down