Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Nats Jetstream consumer #11046 add simple support for jetstream subjects #11373

Merged
merged 15 commits into from
Jul 18, 2022
12 changes: 12 additions & 0 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ instances of telegraf can read from a NATS cluster in parallel.
## subject(s) to consume
subjects = ["telegraf"]

## jetstream subjects
js_subjects = ["js_telegraf"]
b3rtram marked this conversation as resolved.
Show resolved Hide resolved

## name a queue group
queue_group = "telegraf_consumers"

Expand Down Expand Up @@ -62,3 +65,12 @@ instances of telegraf can read from a NATS cluster in parallel.
[nats]: https://www.nats.io/about/
[input data formats]: /docs/DATA_FORMATS_INPUT.md
[queue group]: https://www.nats.io/documentation/concepts/nats-queueing/

## Metrics

Which data you will get depends on the subjects you consume from nats

## Example Output

Depends on the nats subject input
nats_consumer,host=[] value=1.9 1655972309339341000
43 changes: 39 additions & 4 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type natsConsumer struct {
Username string `toml:"username"`
Password string `toml:"password"`
Credentials string `toml:"credentials"`
JsSubjects []string `toml:"js_subjects"`
b3rtram marked this conversation as resolved.
Show resolved Hide resolved

tls.ClientConfig

Expand All @@ -58,8 +59,10 @@ type natsConsumer struct {
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
MetricBuffer int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"`

conn *nats.Conn
subs []*nats.Subscription
conn *nats.Conn
jsConn nats.JetStreamContext
subs []*nats.Subscription
jsSubs []*nats.Subscription

parser parsers.Parser
// channel for all incoming NATS messages
Expand Down Expand Up @@ -142,6 +145,31 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {

n.subs = append(n.subs, sub)
}

if len(n.JsSubjects) > 0 {
var connErr error
n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256))
if connErr != nil {
return connErr
}

for _, jsSub := range n.JsSubjects {
sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}

// set the subscription pending limits
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
if err != nil {
return err
}

n.jsSubs = append(n.jsSubs, sub)
}
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -154,8 +182,8 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
go n.receiver(ctx)
}()

n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, jssubjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.JsSubjects, n.QueueGroup)

return nil
}
Expand Down Expand Up @@ -205,6 +233,13 @@ func (n *natsConsumer) clean() {
}
}

for _, sub := range n.jsSubs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err.Error())
b3rtram marked this conversation as resolved.
Show resolved Hide resolved
}
}

if n.conn != nil && !n.conn.IsClosed() {
n.conn.Close()
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/nats_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
## subject(s) to consume
subjects = ["telegraf"]

## jetstream subjects
js_subjects = ["js_telegraf"]

b3rtram marked this conversation as resolved.
Show resolved Hide resolved
## name a queue group
queue_group = "telegraf_consumers"

Expand Down
131 changes: 131 additions & 0 deletions telegraf.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
b3rtram marked this conversation as resolved.
Show resolved Hide resolved
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply surround
# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"),
# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR})


# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"


# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true

## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000

## Maximum number of unwritten metrics per output. Increasing this value
## allows for longer periods of output downtime without dropping metrics at the
## cost of higher maximum memory usage.
metric_buffer_limit = 10000

## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"

## Collection offset is used to shift the collection by the given amount.
## This can be be used to avoid many plugins querying constraint devices
## at the same time by manually scheduling them in time.
# collection_offset = "0s"

## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"

## Collected metrics are rounded to the precision specified. Precision is
## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
##
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s:
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
##
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
precision = "0s"

## Log at debug level.
# debug = false
## Log only error level messages.
# quiet = false

## Log target controls the destination for logs and can be one of "file",
## "stderr" or, on Windows, "eventlog". When set to "file", the output file
## is determined by the "logfile" setting.
# logtarget = "file"

## Name of the file to be logged to when using the "file" logtarget. If set to
## the empty string then logs are written to stderr.
# logfile = ""

## The logfile will be rotated after the time interval specified. When set
## to 0 no time based rotation is performed. Logs are rotated only when
## written to, if there is no log activity rotation may be delayed.
# logfile_rotation_interval = "0h"

## The logfile will be rotated when it becomes larger than the specified
## size. When set to 0 no size based rotation is performed.
# logfile_rotation_max_size = "0MB"

## Maximum number of rotated archives to keep, any older logs are deleted.
## If set to -1, no archives are removed.
# logfile_rotation_max_archives = 5

## Pick a timezone to use when logging or type 'local' for local time.
## Example: America/Chicago
# log_with_timezone = ""

## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false

## Method of translating SNMP objects. Can be "netsnmp" which
## translates by calling external programs snmptranslate and snmptable,
## or "gosmi" which translates using the built-in gosmi library.
# snmp_translator = "netsnmp"



[[inputs.nats_consumer]]
## urls of NATS servers
servers = ["nats://localhost:4222"]

## subject(s) to consume
subjects = ["telegraf"]


data_format = "json"


[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout"]