Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS Monitoring Input Plugin #3186

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142
github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c
github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add this to docs/LICENSE_OF_DEPENDENCIES.md?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
_ "github.com/influxdata/telegraf/plugins/inputs/nats"
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/net_response"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
Expand Down
12 changes: 12 additions & 0 deletions plugins/inputs/nats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# NATS Monitoring Input Plugin

The [NATS](http://www.nats.io/about/) monitoring plugin reads from
specified NATS instance and submits metrics to InfluxDB.

## Configuration

```toml
[[inputs.nats]]
## The address of the monitoring end-point of the NATS server
server = "http://localhost:8222"
```
79 changes: 79 additions & 0 deletions plugins/inputs/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package nats

import (
"fmt"
"io/ioutil"
"net/http"
"time"

"encoding/json"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"

gnatsd "github.com/nats-io/gnatsd/server"
)

type Nats struct {
Server string
}

var sampleConfig = `
## The address of the monitoring end-point of the NATS server
server = "http://localhost:1337"
`

func (n *Nats) SampleConfig() string {
return sampleConfig
}

func (n *Nats) Description() string {
return "Provides metrics about the state of a NATS server"
}

func (n *Nats) Gather(acc telegraf.Accumulator) error {
theServer := fmt.Sprintf("%s/varz", n.Server)

/* download the page we are intereted in */
resp, err := http.Get(theServer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add in a custom transport for this plugin? The best example to follow is the apache input. You don't have to add SSL support or user configurable timeouts, but make sure there is a timeout on the http.Client of 5 seconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (exposed the timeout as an option too)

if err != nil {
return err
}
defer resp.Body.Close()

bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

var stats = new(gnatsd.Varz)

err = json.Unmarshal([]byte(bytes), &stats)
if err != nil {
return err
}

acc.AddFields("nats",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we call the measurement nats_varz in case we later want to support the other monitoring urls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add in the missing numeric values from the /vars endpoint or at least the ones that change over time (no need to add port). In particular, what about slow_consumers, routes, cores, remotes? I am guessing about what some of these mean so only the ones that seem like time series data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea: slow_consumers, routes, cores, and remotes added. None of the other fields in Varz seem relevant/interesting.

map[string]interface{}{
"in_msgs": stats.InMsgs,
"out_msgs": stats.OutMsgs,
"uptime": time.Since(stats.Start).Seconds(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's store this in nanoseconds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've also made this more correct by using stats.Now to calculate the uptime instead of time.Since. Then uptime is the uptime at the point the metrics were generated, not when Gather was called. This also simplifies testing.

"connections": stats.Connections,
"total_connections": stats.TotalConnections,
"in_bytes": stats.InBytes,
"cpu_usage": stats.CPU,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the number of cpu's used? If so I would call it either cpu to match upstream, or cpu_count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CPU is a float32 containing CPU utilization (Cores has the number of CPU cores). I've renamed the metric to cpu to match upstream and the mem metric (memory usage).

"out_bytes": stats.OutBytes,
"mem": stats.Mem,
"subscriptions": stats.Subscriptions,
}, nil, time.Now())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a tag for the server using the url in the configuration, this will allow the plugin to be used multiple times if needed without conflicts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


return nil
}

func init() {
inputs.Add("nats", func() telegraf.Input {
return &Nats{
Server: "http://localhost:8222",
}
})
}
114 changes: 114 additions & 0 deletions plugins/inputs/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package nats

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var sampleVarz = `
{
"server_id": "n2afhLHLl64Gcaj7S7jaNa",
"version": "1.0.0",
"go": "go1.8",
"host": "0.0.0.0",
"auth_required": false,
"ssl_required": false,
"tls_required": false,
"tls_verify": false,
"addr": "0.0.0.0",
"max_connections": 65536,
"ping_interval": 120000000000,
"ping_max": 2,
"http_host": "0.0.0.0",
"http_port": 1337,
"https_port": 0,
"auth_timeout": 1,
"max_control_line": 1024,
"cluster": {
"addr": "0.0.0.0",
"cluster_port": 0,
"auth_timeout": 1
},
"tls_timeout": 0.5,
"port": 4222,
"max_payload": 1048576,
"start": "1861-04-12T10:15:26.841483489-05:00",
"now": "2011-10-05T15:24:23.722084098-07:00",
"uptime": "150y5md237h8m57s",
"mem": 15581184,
"cores": 48,
"cpu": 9,
"connections": 2,
"total_connections": 109,
"routes": 0,
"remotes": 0,
"in_msgs": 74148556,
"out_msgs": 68863261,
"in_bytes": 946267004717,
"out_bytes": 948110960598,
"slow_consumers": 0,
"subscriptions": 1,
"http_req_stats": {
"/": 1,
"/connz": 100847,
"/routez": 0,
"/subsz": 1,
"/varz": 205785
},
"config_load_time": "2017-07-24T10:15:26.841483489-05:00"
}
`

func TestMetricsCorrect(t *testing.T) {
var acc testutil.Accumulator

srv := newTestNatsServer()
defer srv.Close()

n := &Nats{Server: srv.URL}
err := n.Gather(&acc)
require.NoError(t, err)

/*
* we get the measurement, and override it, this is neccessary
* because we can't "equal" the uptime value reliably, as it is
* calculated via Time.Now() and the Start value in Varz
*/
s, f := acc.Get("nats")
assert.Equal(t, true, f, "nats measurement must be found")

fields := make(map[string]interface{})
fields["uptime"] = s.Fields["uptime"]
fields["in_msgs"] = int64(74148556)
fields["out_msgs"] = int64(68863261)
fields["connections"] = int(2)
fields["total_connections"] = uint64(109)
fields["in_bytes"] = int64(946267004717)
fields["out_bytes"] = int64(948110960598)
fields["cpu_usage"] = float64(9)
fields["mem"] = int64(15581184)
fields["subscriptions"] = uint32(1)

acc.AssertContainsFields(t, "nats", fields)
}

func newTestNatsServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string

switch r.URL.Path {
case "/varz":
rsp = sampleVarz
default:
panic("Cannot handle request")
}

fmt.Fprintln(w, rsp)
}))
}