Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/jolokia plugin #337

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/exec"
_ "github.com/influxdb/telegraf/plugins/haproxy"
_ "github.com/influxdb/telegraf/plugins/httpjson"
_ "github.com/influxdb/telegraf/plugins/jolokia"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/leofs"
_ "github.com/influxdb/telegraf/plugins/lustre2"
Expand Down
213 changes: 213 additions & 0 deletions plugins/jolokia/jolokia.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package jolokia

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
// "sync"

"github.com/influxdb/telegraf/plugins"
)

type Server struct {
Name string
Host string
Port string
}

type Metric struct {
Name string
Jmx string
Pass []string
Drop []string
}

type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}

type JolokiaClientImpl struct {
client *http.Client
}

func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}

type Jolokia struct {
jClient JolokiaClient
Context string
Servers []Server
Metrics []Metric
Tags map[string]string
}

func (j *Jolokia) SampleConfig() string {
return `
context = "/jolokia/read"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put some comments in here to briefly describe some of these options?


[jolokia.tags]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be space-indented

group = "as"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also space-indented


[[jolokia.servers]]
name = "stable"
host = "192.168.103.2"
port = "8180"

[[jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
pass = ["used"]

[[jolokia.metrics]]
name = "memory_eden"
jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage"
pass = ["used"]

[[jolokia.metrics]]
name = "heap_threads"
jmx = "/java.lang:type=Threading"
# drop = ["AllThread"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this line commented?

pass = ["CurrentThreadCpuTime","CurrentThreadUserTime","DaemonThreadCount","ThreadCount","TotalStartedThreadCount"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line-length, split this list on multiple lines like:

pass = [
  "foo",
  "bar",
  ...
]

`
}

func (j *Jolokia) Description() string {
return "Read JMX metrics through Jolokia"
}

func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}

resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}

if err != nil {
return nil, err
}
defer resp.Body.Close()

// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}

// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}

return jsonOut, nil
}

func (m *Metric) shouldPass(field string) bool {

if m.Pass != nil {

for _, pass := range m.Pass {
if strings.HasPrefix(field, pass) {
return true
}
}

return false
}

if m.Drop != nil {

for _, drop := range m.Drop {
if strings.HasPrefix(field, drop) {
return false
}
}

return true
}

return true
}

func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} {

for field, _ := range fields {
if !m.shouldPass(field) {
delete(fields, field)
}
}

return fields
}

func (j *Jolokia) Gather(acc plugins.Accumulator) error {

context := j.Context //"/jolokia/read"
servers := j.Servers
metrics := j.Metrics
tags := j.Tags

if tags == nil {
tags = map[string]string{}
}

for _, server := range servers {
for _, metric := range metrics {

measurement := metric.Name
jmxPath := metric.Jmx

tags["server"] = server.Name
tags["port"] = server.Port
tags["host"] = server.Host

// Prepare URL
requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath)
if err != nil {
return err
}

out, _ := j.getAttr(requestUrl)

if values, ok := out["value"]; ok {
switch values.(type) {
case map[string]interface{}:
acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags)
case interface{}:
acc.Add(measurement, values.(interface{}), tags)
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String())
}
}
}

return nil
}

func init() {
plugins.Add("jolokia", func() plugins.Plugin {
return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}}
})
}
147 changes: 147 additions & 0 deletions plugins/jolokia/jolokia_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package jolokia

import (
_ "fmt"
"io/ioutil"
"net/http"
"strings"
"testing"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
_ "github.com/stretchr/testify/require"
)

const validMultiValueJSON = `
{
"request":{
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":{
"init":67108864,
"committed":456130560,
"max":477626368,
"used":203288528
},
"timestamp":1446129191,
"status":200
}`

const validSingleValueJSON = `
{
"request":{
"path":"used",
"mbean":"java.lang:type=Memory",
"attribute":"HeapMemoryUsage",
"type":"read"
},
"value":209274376,
"timestamp":1446129256,
"status":200
}`

const invalidJSON = "I don't think this is JSON"

const empty = ""

var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}}
var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"}
var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}}

type jolokiaClientStub struct {
responseBody string
statusCode int
}

func (c jolokiaClientStub) MakeRequest(req *http.Request) (*http.Response, error) {
resp := http.Response{}
resp.StatusCode = c.statusCode
resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody))
return &resp, nil
}

// Generates a pointer to an HttpJson object that uses a mock HTTP client.
// Parameters:
// response : Body of the response that the mock HTTP client should return
// statusCode: HTTP status code the mock HTTP client should return
//
// Returns:
// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client
func genJolokiaClientStub(response string, statusCode int, servers []Server, metrics []Metric) *Jolokia {
return &Jolokia{
jClient: jolokiaClientStub{responseBody: response, statusCode: statusCode},
Servers: servers,
Metrics: metrics,
}
}

// Test that the proper values are ignored or collected
func TestHttpJsonMultiValue(t *testing.T) {

jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric})

var acc testutil.Accumulator
err := jolokia.Gather(&acc)

assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))

assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0,
"committed": 456130560.0,
"max": 477626368.0,
"used": 203288528.0}))
}

// Test that the proper values are ignored or collected
func TestHttpJsonMultiValueWithPass(t *testing.T) {

jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric})

var acc testutil.Accumulator
err := jolokia.Gather(&acc)

assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))

assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}))
}

// Test that the proper values are ignored or collected
func TestHttpJsonMultiValueTags(t *testing.T) {

jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric})

var acc testutil.Accumulator
err := jolokia.Gather(&acc)

assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"}))
}

// Test that the proper values are ignored or collected
func TestHttpJsonSingleValueTags(t *testing.T) {

jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric})

var acc testutil.Accumulator
err := jolokia.Gather(&acc)

assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"}))
}

// Test that the proper values are ignored or collected
func TestHttpJsonOn404(t *testing.T) {

jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric})

var acc testutil.Accumulator
err := jolokia.Gather(&acc)

assert.Nil(t, err)
assert.Equal(t, 0, len(acc.Points))
}
Loading