Skip to content

Commit

Permalink
Add support to parse JSON array.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnrengelman committed Oct 28, 2016
1 parent 522658b commit 8e1cefa
Show file tree
Hide file tree
Showing 15 changed files with 382 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ documentation for configuring journald. There is also a [`logfile` config option
available in 1.1, which will allow users to easily configure telegraf to
continue sending logs to /var/log/telegraf/telegraf.log.

- The JSON parser can now parse JSON data where the root object is an array.
The parsing configuration is applied to each element of the array.

### Features

- [#1726](https://github.com/influxdata/telegraf/issues/1726): Processor & Aggregator plugin support.
Expand Down
65 changes: 65 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ For example, if you had this configuration:

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## specifies if the incoming JSON data is an array of metric data (true)
## to parse or a single object (false)
array = false

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
Expand Down Expand Up @@ -147,6 +151,67 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6
```

If the JSON data is an array, then setting the `array` to `true` configures
Telegraf to parse each element of the array with the configured settings.
Each resulting metric will be output with the same timestamp.

For example, if you had this configuration:

```toml
[[inputs.exec]]
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## specifies if the incoming JSON data is an array of metric data (true)
## to parse or a single object (false)
array = true

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
tag_keys = [
"my_tag_1",
"my_tag_2"
]
```

with this JSON output from a command:

```json
[
{
"a": 5,
"b": {
"c": 6
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]
```

Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
```

# Value:

The "value" data format translates single values into Telegraf metrics. This
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.")

ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewJSONParser("exec", nil, nil)
p, err := parsers.NewJSONParser("exec", nil, nil, false)
assert.NoError(t, err)
ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar"
Expand Down
6 changes: 3 additions & 3 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
}

func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewJSONParser("exec", []string{}, nil, false)
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
Expand All @@ -117,7 +117,7 @@ func TestExec(t *testing.T) {
}

func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewJSONParser("exec", []string{}, nil, false)
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
Expand All @@ -131,7 +131,7 @@ func TestExecMalformed(t *testing.T) {
}

func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewJSONParser("exec", []string{}, nil, false)
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
Expand Down
60 changes: 60 additions & 0 deletions plugins/inputs/httpjson/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ You can also specify which keys from server response should be considered tags:
]
```

You can also specify if the response is an array of items that should be parsed:

```
[[inputs.httpjson]]
...
array = true
```

You can also specify additional request parameters for the service:

```
Expand Down Expand Up @@ -150,3 +159,54 @@ httpjson_mycollector1_b_e,server='http://my.service.com/_stats' value=5
httpjson_mycollector2_load,server='http://service.net/json/stats' value=100
httpjson_mycollector2_users,server='http://service.net/json/stats' value=1335
```

# Example 3, Multiple Metrics in Response:

The response JSON can be treated as an array of data points that are all parsed with the same configuration.

```
[[inputs.httpjson]]
name = "mycollector"
servers = [
"http://my.service.com/_stats"
]
# HTTP method to use (case-sensitive)
method = "GET"
array = true
tag_keys = ["service"]
```

which responds with the following JSON:

```json
[
{
"service": "service01",
"a": 0.5,
"b": {
"c": "some text",
"d": 0.1,
"e": 5
}
},
{
"service": "service02",
"a": 0.6,
"b": {
"c": "some text",
"d": 0.2,
"e": 6
}
}
]
```

The collected metrics will be:
```
httpjson_mycollector_a,service='service01',server='http://my.service.com/_stats' value=0.5
httpjson_mycollector_b_d,service='service01',server='http://my.service.com/_stats' value=0.1
httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stats' value=5
httpjson_mycollector_a,service='service02',server='http://my.service.com/_stats' value=0.6
httpjson_mycollector_b_d,service='service02',server='http://my.service.com/_stats' value=0.2
httpjson_mycollector_b_e,service='service02',server='http://my.service.com/_stats' value=6
```
7 changes: 6 additions & 1 deletion plugins/inputs/httpjson/httpjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type HttpJson struct {
Servers []string
Method string
TagKeys []string
Array bool
ResponseTimeout internal.Duration
Parameters map[string]string
Headers map[string]string
Expand Down Expand Up @@ -87,6 +88,10 @@ var sampleConfig = `
## HTTP method to use: GET or POST (case-sensitive)
method = "GET"
## Specifies if the JSON data is a single object or an array of
## metric objects
array = false
## List of tag names to extract from top-level of JSON server response
# tag_keys = [
# "my_tag_1",
Expand Down Expand Up @@ -195,7 +200,7 @@ func (h *HttpJson) gatherServer(
"server": serverURL,
}

parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags, h.Array)
if err != nil {
return err
}
Expand Down
70 changes: 63 additions & 7 deletions plugins/inputs/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *mockHTTPClient) HTTPClient() *http.Client {
//
// Returns:
// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client
func genMockHttpJson(response string, statusCode int) []*HttpJson {
func genMockHttpJson(response string, statusCode int, array bool) []*HttpJson {
return []*HttpJson{
&HttpJson{
client: &mockHTTPClient{responseBody: response, statusCode: statusCode},
Expand All @@ -171,6 +171,7 @@ func genMockHttpJson(response string, statusCode int) []*HttpJson {
},
Name: "my_webapp",
Method: "GET",
Array: array,
Parameters: map[string]string{
"httpParam1": "12",
"httpParam2": "the second parameter",
Expand All @@ -188,6 +189,7 @@ func genMockHttpJson(response string, statusCode int) []*HttpJson {
},
Name: "other_webapp",
Method: "POST",
Array: array,
Parameters: map[string]string{
"httpParam1": "12",
"httpParam2": "the second parameter",
Expand All @@ -206,7 +208,7 @@ func genMockHttpJson(response string, statusCode int) []*HttpJson {

// Test that the proper values are ignored or collected
func TestHttpJson200(t *testing.T) {
httpjson := genMockHttpJson(validJSON, 200)
httpjson := genMockHttpJson(validJSON, 200, false)

for _, service := range httpjson {
var acc testutil.Accumulator
Expand Down Expand Up @@ -445,7 +447,7 @@ func TestHttpJsonPOST(t *testing.T) {

// Test response to HTTP 500
func TestHttpJson500(t *testing.T) {
httpjson := genMockHttpJson(validJSON, 500)
httpjson := genMockHttpJson(validJSON, 500, false)

var acc testutil.Accumulator
err := httpjson[0].Gather(&acc)
Expand All @@ -456,7 +458,7 @@ func TestHttpJson500(t *testing.T) {

// Test response to HTTP 405
func TestHttpJsonBadMethod(t *testing.T) {
httpjson := genMockHttpJson(validJSON, 200)
httpjson := genMockHttpJson(validJSON, 200, false)
httpjson[0].Method = "NOT_A_REAL_METHOD"

var acc testutil.Accumulator
Expand All @@ -468,7 +470,7 @@ func TestHttpJsonBadMethod(t *testing.T) {

// Test response to malformed JSON
func TestHttpJsonBadJson(t *testing.T) {
httpjson := genMockHttpJson(invalidJSON, 200)
httpjson := genMockHttpJson(invalidJSON, 200, false)

var acc testutil.Accumulator
err := httpjson[0].Gather(&acc)
Expand All @@ -479,7 +481,7 @@ func TestHttpJsonBadJson(t *testing.T) {

// Test response to empty string as response objectgT
func TestHttpJsonEmptyResponse(t *testing.T) {
httpjson := genMockHttpJson(empty, 200)
httpjson := genMockHttpJson(empty, 200, false)

var acc testutil.Accumulator
err := httpjson[0].Gather(&acc)
Expand All @@ -490,7 +492,7 @@ func TestHttpJsonEmptyResponse(t *testing.T) {

// Test that the proper values are ignored or collected
func TestHttpJson200Tags(t *testing.T) {
httpjson := genMockHttpJson(validJSONTags, 200)
httpjson := genMockHttpJson(validJSONTags, 200, false)

for _, service := range httpjson {
if service.Name == "other_webapp" {
Expand All @@ -511,3 +513,57 @@ func TestHttpJson200Tags(t *testing.T) {
}
}
}

const validJSONArrayTags = `
[
{
"value": 15,
"role": "master",
"build": "123"
},
{
"value": 17,
"role": "slave",
"build": "456"
}
]`

// Test that array data is collected correctly
func TestHttpJsonArray200Tags(t *testing.T) {
httpjson := genMockHttpJson(validJSONArrayTags, 200, true)

for _, service := range httpjson {
if service.Name == "other_webapp" {
var acc testutil.Accumulator
err := service.Gather(&acc)
// Set responsetime
for _, p := range acc.Metrics {
p.Fields["response_time"] = 1.0
}
require.NoError(t, err)
assert.Equal(t, 8, acc.NFields())
assert.Equal(t, uint64(4), acc.NMetrics())
// for _, srv := range service.Servers {
// tags := map[string]string{"server": srv, "role": "master", "build": "123"}
// fields := map[string]interface{}{"value": float64(15), "response_time": float64(1)}
// mname := "httpjson_" + service.Name
// acc.AssertContainsTaggedFields(t, mname, fields, tags)
// }
for _, m := range acc.Metrics {
if m.Tags["role"] == "master" {
assert.Equal(t, "123", m.Tags["build"])
assert.Equal(t, float64(15), m.Fields["value"])
assert.Equal(t, float64(1), m.Fields["response_time"])
assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
} else if m.Tags["role"] == "slave" {
assert.Equal(t, "456", m.Tags["build"])
assert.Equal(t, float64(17), m.Fields["value"])
assert.Equal(t, float64(1), m.Fields["response_time"])
assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
} else {
assert.FailNow(t, "unknown metric")
}
}
}
}
}
2 changes: 1 addition & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil, false)
go k.receiver()
in <- saramaMsg(testMsgJSON)
time.Sleep(time.Millisecond * 5)
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)

n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil, false)
go n.receiver()
in <- mqttMsg(testMsgJSON)
time.Sleep(time.Millisecond * 25)
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/nats_consumer/nats_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)

n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil, false)
go n.receiver()
in <- natsMsg(testMsgJSON)
time.Sleep(time.Millisecond * 25)
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/tcp_listener/tcp_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)

listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil, false)
listener.wg.Add(1)
go listener.tcpParser()

Expand Down
Loading

0 comments on commit 8e1cefa

Please sign in to comment.