Skip to content

Commit

Permalink
Add support to parse JSON array.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnrengelman committed Nov 3, 2016
1 parent 522658b commit f585e60
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ documentation for configuring journald. There is also a [`logfile` config option
available in 1.1, which will allow users to easily configure telegraf to
continue sending logs to /var/log/telegraf/telegraf.log.

- The JSON parser can now parse JSON data where the root object is an array.
The parsing configuration is applied to each element of the array.

### Features

- [#1726](https://github.com/influxdata/telegraf/issues/1726): Processor & Aggregator plugin support.
Expand Down
56 changes: 56 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,62 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6
```

If the JSON data is an array, then each element of the array is parsed with the configured settings.
Each resulting metric will be output with the same timestamp.

For example, if the following configuration:

```toml
[[inputs.exec]]
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
tag_keys = [
"my_tag_1",
"my_tag_2"
]
```

with this JSON output from a command:

```json
[
{
"a": 5,
"b": {
"c": 6
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]
```

Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
```

# Value:

The "value" data format translates single values into Telegraf metrics. This
Expand Down
52 changes: 52 additions & 0 deletions plugins/inputs/httpjson/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ You can also specify which keys from server response should be considered tags:
]
```

If the JSON response is an array of objects, then each object will be parsed with the same configuration.

You can also specify additional request parameters for the service:

```
Expand Down Expand Up @@ -150,3 +152,53 @@ httpjson_mycollector1_b_e,server='http://my.service.com/_stats' value=5
httpjson_mycollector2_load,server='http://service.net/json/stats' value=100
httpjson_mycollector2_users,server='http://service.net/json/stats' value=1335
```

# Example 3, Multiple Metrics in Response:

The response JSON can be treated as an array of data points that are all parsed with the same configuration.

```
[[inputs.httpjson]]
name = "mycollector"
servers = [
"http://my.service.com/_stats"
]
# HTTP method to use (case-sensitive)
method = "GET"
tag_keys = ["service"]
```

which responds with the following JSON:

```json
[
{
"service": "service01",
"a": 0.5,
"b": {
"c": "some text",
"d": 0.1,
"e": 5
}
},
{
"service": "service02",
"a": 0.6,
"b": {
"c": "some text",
"d": 0.2,
"e": 6
}
}
]
```

The collected metrics will be:
```
httpjson_mycollector_a,service='service01',server='http://my.service.com/_stats' value=0.5
httpjson_mycollector_b_d,service='service01',server='http://my.service.com/_stats' value=0.1
httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stats' value=5
httpjson_mycollector_a,service='service02',server='http://my.service.com/_stats' value=0.6
httpjson_mycollector_b_d,service='service02',server='http://my.service.com/_stats' value=0.2
httpjson_mycollector_b_e,service='service02',server='http://my.service.com/_stats' value=6
```
49 changes: 49 additions & 0 deletions plugins/inputs/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,52 @@ func TestHttpJson200Tags(t *testing.T) {
}
}
}

const validJSONArrayTags = `
[
{
"value": 15,
"role": "master",
"build": "123"
},
{
"value": 17,
"role": "slave",
"build": "456"
}
]`

// Test that array data is collected correctly
func TestHttpJsonArray200Tags(t *testing.T) {
httpjson := genMockHttpJson(validJSONArrayTags, 200)

for _, service := range httpjson {
if service.Name == "other_webapp" {
var acc testutil.Accumulator
err := service.Gather(&acc)
// Set responsetime
for _, p := range acc.Metrics {
p.Fields["response_time"] = 1.0
}
require.NoError(t, err)
assert.Equal(t, 8, acc.NFields())
assert.Equal(t, uint64(4), acc.NMetrics())

for _, m := range acc.Metrics {
if m.Tags["role"] == "master" {
assert.Equal(t, "123", m.Tags["build"])
assert.Equal(t, float64(15), m.Fields["value"])
assert.Equal(t, float64(1), m.Fields["response_time"])
assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
} else if m.Tags["role"] == "slave" {
assert.Equal(t, "456", m.Tags["build"])
assert.Equal(t, float64(17), m.Fields["value"])
assert.Equal(t, float64(1), m.Fields["response_time"])
assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
} else {
assert.FailNow(t, "unknown metric")
}
}
}
}
}
41 changes: 37 additions & 4 deletions plugins/parsers/json/parser.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package json

import (
"bytes"
"encoding/json"
"fmt"
"strconv"
Expand All @@ -16,15 +17,22 @@ type JSONParser struct {
DefaultTags map[string]string
}

func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)

var jsonOut map[string]interface{}
var jsonOut []map[string]interface{}
err := json.Unmarshal(buf, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
err = fmt.Errorf("unable to parse out as JSON Array, %s", err)
return nil, err
}
for _, item := range jsonOut {
metrics, err = p.parseObject(metrics, item)
}
return metrics, nil
}

func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {

tags := make(map[string]string)
for k, v := range p.DefaultTags {
Expand All @@ -44,7 +52,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
}

f := JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
err := f.FlattenJSON("", jsonOut)
if err != nil {
return nil, err
}
Expand All @@ -57,6 +65,21 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return append(metrics, metric), nil
}

func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {

if !isarray(buf) {
metrics := make([]telegraf.Metric, 0)
var jsonOut map[string]interface{}
err := json.Unmarshal(buf, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
return nil, err
}
return p.parseObject(metrics, jsonOut)
}
return p.parseArray(buf)
}

func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

Expand Down Expand Up @@ -115,3 +138,13 @@ func (f *JSONFlattener) FlattenJSON(
}
return nil
}

func isarray(buf []byte) bool {
ia := bytes.IndexByte(buf, '[')
ib := bytes.IndexByte(buf, '{')
if ia > -1 && ia < ib {
return true
} else {
return false
}
}
Loading

0 comments on commit f585e60

Please sign in to comment.