From 8e1cefa799cdfb631de6fe6e4a8538014c24d951 Mon Sep 17 00:00:00 2001 From: John Engelman Date: Thu, 27 Oct 2016 13:21:33 -0500 Subject: [PATCH] Add support to parse JSON array. --- CHANGELOG.md | 3 + docs/DATA_FORMATS_INPUT.md | 65 ++++++++ internal/config/config_test.go | 2 +- plugins/inputs/exec/exec_test.go | 6 +- plugins/inputs/httpjson/README.md | 60 +++++++ plugins/inputs/httpjson/httpjson.go | 7 +- plugins/inputs/httpjson/httpjson_test.go | 70 ++++++++- .../kafka_consumer/kafka_consumer_test.go | 2 +- .../mqtt_consumer/mqtt_consumer_test.go | 2 +- .../nats_consumer/nats_consumer_test.go | 2 +- .../inputs/tcp_listener/tcp_listener_test.go | 2 +- .../inputs/udp_listener/udp_listener_test.go | 2 +- plugins/parsers/json/parser.go | 31 +++- plugins/parsers/json/parser_test.go | 148 +++++++++++++++++- plugins/parsers/registry.go | 6 +- 15 files changed, 382 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfa79ebd8ca90..5ec9e62e8afe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ documentation for configuring journald. There is also a [`logfile` config option available in 1.1, which will allow users to easily configure telegraf to continue sending logs to /var/log/telegraf/telegraf.log. +- The JSON parser can now parse JSON data where the root object is an array. +The parsing configuration is applied to each element of the array. + ### Features - [#1726](https://github.com/influxdata/telegraf/issues/1726): Processor & Aggregator plugin support. diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index c14752d9cdf81..a7db9c038480d 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -115,6 +115,10 @@ For example, if you had this configuration: ## measurement name suffix (for separating different commands) name_suffix = "_mycollector" + + ## specifies if the incoming JSON data is an array of metric data (true) + ## to parse or a single object (false) + array = false ## Data format to consume. ## Each data format has it's own unique set of configuration options, read @@ -147,6 +151,67 @@ Your Telegraf metrics would get tagged with "my_tag_1" exec_mycollector,my_tag_1=foo a=5,b_c=6 ``` +If the JSON data is an array, then setting the `array` to `true` configures +Telegraf to parse each element of the array with the configured settings. +Each resulting metric will be output with the same timestamp. + +For example, if you had this configuration: + +```toml +[[inputs.exec]] + ## Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ## measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ## specifies if the incoming JSON data is an array of metric data (true) + ## to parse or a single object (false) + array = true + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "json" + + ## List of tag names to extract from top-level of JSON server response + tag_keys = [ + "my_tag_1", + "my_tag_2" + ] +``` + +with this JSON output from a command: + +```json +[ + { + "a": 5, + "b": { + "c": 6 + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }, + { + "a": 7, + "b": { + "c": 8 + }, + "my_tag_1": "bar", + "my_tag_2": "baz" + } +] +``` + +Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" + +``` +exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6 +exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8 +``` + # Value: The "value" data format translates single values into Telegraf metrics. This diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 3498d815d0078..51df7ccf317d6 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -143,7 +143,7 @@ func TestConfig_LoadDirectory(t *testing.T) { "Testdata did not produce correct memcached metadata.") ex := inputs.Inputs["exec"]().(*exec.Exec) - p, err := parsers.NewJSONParser("exec", nil, nil) + p, err := parsers.NewJSONParser("exec", nil, nil, false) assert.NoError(t, err) ex.SetParser(p) ex.Command = "/usr/bin/myothercollector --foo=bar" diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index ac527a12f5790..86970272e0681 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -91,7 +91,7 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by } func TestExec(t *testing.T) { - parser, _ := parsers.NewJSONParser("exec", []string{}, nil) + parser, _ := parsers.NewJSONParser("exec", []string{}, nil, false) e := &Exec{ runner: newRunnerMock([]byte(validJson), nil), Commands: []string{"testcommand arg1"}, @@ -117,7 +117,7 @@ func TestExec(t *testing.T) { } func TestExecMalformed(t *testing.T) { - parser, _ := parsers.NewJSONParser("exec", []string{}, nil) + parser, _ := parsers.NewJSONParser("exec", []string{}, nil, false) e := &Exec{ runner: newRunnerMock([]byte(malformedJson), nil), Commands: []string{"badcommand arg1"}, @@ -131,7 +131,7 @@ func TestExecMalformed(t *testing.T) { } func TestCommandError(t *testing.T) { - parser, _ := parsers.NewJSONParser("exec", []string{}, nil) + parser, _ := parsers.NewJSONParser("exec", []string{}, nil, false) e := &Exec{ runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), Commands: []string{"badcommand"}, diff --git a/plugins/inputs/httpjson/README.md b/plugins/inputs/httpjson/README.md index 81680e6ec16ba..a14846d1fac82 100644 --- a/plugins/inputs/httpjson/README.md +++ b/plugins/inputs/httpjson/README.md @@ -37,6 +37,15 @@ You can also specify which keys from server response should be considered tags: ] ``` +You can also specify if the response is an array of items that should be parsed: + +``` +[[inputs.httpjson]] + ... + + array = true +``` + You can also specify additional request parameters for the service: ``` @@ -150,3 +159,54 @@ httpjson_mycollector1_b_e,server='http://my.service.com/_stats' value=5 httpjson_mycollector2_load,server='http://service.net/json/stats' value=100 httpjson_mycollector2_users,server='http://service.net/json/stats' value=1335 ``` + +# Example 3, Multiple Metrics in Response: + +The response JSON can be treated as an array of data points that are all parsed with the same configuration. + +``` +[[inputs.httpjson]] + name = "mycollector" + servers = [ + "http://my.service.com/_stats" + ] + # HTTP method to use (case-sensitive) + method = "GET" + array = true + tag_keys = ["service"] +``` + +which responds with the following JSON: + +```json +[ + { + "service": "service01", + "a": 0.5, + "b": { + "c": "some text", + "d": 0.1, + "e": 5 + } + }, + { + "service": "service02", + "a": 0.6, + "b": { + "c": "some text", + "d": 0.2, + "e": 6 + } + } +] +``` + +The collected metrics will be: +``` +httpjson_mycollector_a,service='service01',server='http://my.service.com/_stats' value=0.5 +httpjson_mycollector_b_d,service='service01',server='http://my.service.com/_stats' value=0.1 +httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stats' value=5 +httpjson_mycollector_a,service='service02',server='http://my.service.com/_stats' value=0.6 +httpjson_mycollector_b_d,service='service02',server='http://my.service.com/_stats' value=0.2 +httpjson_mycollector_b_e,service='service02',server='http://my.service.com/_stats' value=6 +``` diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index 89bfccf77a31d..cf450a7e56fa0 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -22,6 +22,7 @@ type HttpJson struct { Servers []string Method string TagKeys []string + Array bool ResponseTimeout internal.Duration Parameters map[string]string Headers map[string]string @@ -87,6 +88,10 @@ var sampleConfig = ` ## HTTP method to use: GET or POST (case-sensitive) method = "GET" + ## Specifies if the JSON data is a single object or an array of + ## metric objects + array = false + ## List of tag names to extract from top-level of JSON server response # tag_keys = [ # "my_tag_1", @@ -195,7 +200,7 @@ func (h *HttpJson) gatherServer( "server": serverURL, } - parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags) + parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags, h.Array) if err != nil { return err } diff --git a/plugins/inputs/httpjson/httpjson_test.go b/plugins/inputs/httpjson/httpjson_test.go index 31447b307176f..d9a737a470def 100644 --- a/plugins/inputs/httpjson/httpjson_test.go +++ b/plugins/inputs/httpjson/httpjson_test.go @@ -161,7 +161,7 @@ func (c *mockHTTPClient) HTTPClient() *http.Client { // // Returns: // *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client -func genMockHttpJson(response string, statusCode int) []*HttpJson { +func genMockHttpJson(response string, statusCode int, array bool) []*HttpJson { return []*HttpJson{ &HttpJson{ client: &mockHTTPClient{responseBody: response, statusCode: statusCode}, @@ -171,6 +171,7 @@ func genMockHttpJson(response string, statusCode int) []*HttpJson { }, Name: "my_webapp", Method: "GET", + Array: array, Parameters: map[string]string{ "httpParam1": "12", "httpParam2": "the second parameter", @@ -188,6 +189,7 @@ func genMockHttpJson(response string, statusCode int) []*HttpJson { }, Name: "other_webapp", Method: "POST", + Array: array, Parameters: map[string]string{ "httpParam1": "12", "httpParam2": "the second parameter", @@ -206,7 +208,7 @@ func genMockHttpJson(response string, statusCode int) []*HttpJson { // Test that the proper values are ignored or collected func TestHttpJson200(t *testing.T) { - httpjson := genMockHttpJson(validJSON, 200) + httpjson := genMockHttpJson(validJSON, 200, false) for _, service := range httpjson { var acc testutil.Accumulator @@ -445,7 +447,7 @@ func TestHttpJsonPOST(t *testing.T) { // Test response to HTTP 500 func TestHttpJson500(t *testing.T) { - httpjson := genMockHttpJson(validJSON, 500) + httpjson := genMockHttpJson(validJSON, 500, false) var acc testutil.Accumulator err := httpjson[0].Gather(&acc) @@ -456,7 +458,7 @@ func TestHttpJson500(t *testing.T) { // Test response to HTTP 405 func TestHttpJsonBadMethod(t *testing.T) { - httpjson := genMockHttpJson(validJSON, 200) + httpjson := genMockHttpJson(validJSON, 200, false) httpjson[0].Method = "NOT_A_REAL_METHOD" var acc testutil.Accumulator @@ -468,7 +470,7 @@ func TestHttpJsonBadMethod(t *testing.T) { // Test response to malformed JSON func TestHttpJsonBadJson(t *testing.T) { - httpjson := genMockHttpJson(invalidJSON, 200) + httpjson := genMockHttpJson(invalidJSON, 200, false) var acc testutil.Accumulator err := httpjson[0].Gather(&acc) @@ -479,7 +481,7 @@ func TestHttpJsonBadJson(t *testing.T) { // Test response to empty string as response objectgT func TestHttpJsonEmptyResponse(t *testing.T) { - httpjson := genMockHttpJson(empty, 200) + httpjson := genMockHttpJson(empty, 200, false) var acc testutil.Accumulator err := httpjson[0].Gather(&acc) @@ -490,7 +492,7 @@ func TestHttpJsonEmptyResponse(t *testing.T) { // Test that the proper values are ignored or collected func TestHttpJson200Tags(t *testing.T) { - httpjson := genMockHttpJson(validJSONTags, 200) + httpjson := genMockHttpJson(validJSONTags, 200, false) for _, service := range httpjson { if service.Name == "other_webapp" { @@ -511,3 +513,57 @@ func TestHttpJson200Tags(t *testing.T) { } } } + +const validJSONArrayTags = ` +[ + { + "value": 15, + "role": "master", + "build": "123" + }, + { + "value": 17, + "role": "slave", + "build": "456" + } +]` + +// Test that array data is collected correctly +func TestHttpJsonArray200Tags(t *testing.T) { + httpjson := genMockHttpJson(validJSONArrayTags, 200, true) + + for _, service := range httpjson { + if service.Name == "other_webapp" { + var acc testutil.Accumulator + err := service.Gather(&acc) + // Set responsetime + for _, p := range acc.Metrics { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err) + assert.Equal(t, 8, acc.NFields()) + assert.Equal(t, uint64(4), acc.NMetrics()) + // for _, srv := range service.Servers { + // tags := map[string]string{"server": srv, "role": "master", "build": "123"} + // fields := map[string]interface{}{"value": float64(15), "response_time": float64(1)} + // mname := "httpjson_" + service.Name + // acc.AssertContainsTaggedFields(t, mname, fields, tags) + // } + for _, m := range acc.Metrics { + if m.Tags["role"] == "master" { + assert.Equal(t, "123", m.Tags["build"]) + assert.Equal(t, float64(15), m.Fields["value"]) + assert.Equal(t, float64(1), m.Fields["response_time"]) + assert.Equal(t, "httpjson_"+service.Name, m.Measurement) + } else if m.Tags["role"] == "slave" { + assert.Equal(t, "456", m.Tags["build"]) + assert.Equal(t, float64(17), m.Fields["value"]) + assert.Equal(t, float64(1), m.Fields["response_time"]) + assert.Equal(t, "httpjson_"+service.Name, m.Measurement) + } else { + assert.FailNow(t, "unknown metric") + } + } + } + } +} diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 609dc6a377d8d..399f2e62f13ff 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -108,7 +108,7 @@ func TestRunParserAndGatherJSON(t *testing.T) { k.acc = &acc defer close(k.done) - k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) + k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil, false) go k.receiver() in <- saramaMsg(testMsgJSON) time.Sleep(time.Millisecond * 5) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 7090a46c35edf..3d80f0e05b9a0 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -153,7 +153,7 @@ func TestRunParserAndGatherJSON(t *testing.T) { n.acc = &acc defer close(n.done) - n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) + n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil, false) go n.receiver() in <- mqttMsg(testMsgJSON) time.Sleep(time.Millisecond * 25) diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 75fde66a6ca1d..2b95ce4a6e6c5 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -108,7 +108,7 @@ func TestRunParserAndGatherJSON(t *testing.T) { n.acc = &acc defer close(n.done) - n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) + n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil, false) go n.receiver() in <- natsMsg(testMsgJSON) time.Sleep(time.Millisecond * 25) diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go index f7e5784d3e2df..b5abe31a6ab28 100644 --- a/plugins/inputs/tcp_listener/tcp_listener_test.go +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -299,7 +299,7 @@ func TestRunParserJSONMsg(t *testing.T) { listener.acc = &acc defer close(listener.done) - listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil, false) listener.wg.Add(1) go listener.tcpParser() diff --git a/plugins/inputs/udp_listener/udp_listener_test.go b/plugins/inputs/udp_listener/udp_listener_test.go index fa9980682c19e..1582b02df539a 100644 --- a/plugins/inputs/udp_listener/udp_listener_test.go +++ b/plugins/inputs/udp_listener/udp_listener_test.go @@ -182,7 +182,7 @@ func TestRunParserJSONMsg(t *testing.T) { listener.acc = &acc defer close(listener.done) - listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil, false) listener.wg.Add(1) go listener.udpParser() diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index 180f2452aff15..40930e0c53036 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -14,17 +14,25 @@ type JSONParser struct { MetricName string TagKeys []string DefaultTags map[string]string + Array bool } -func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { metrics := make([]telegraf.Metric, 0) - var jsonOut map[string]interface{} + var jsonOut []map[string]interface{} err := json.Unmarshal(buf, &jsonOut) if err != nil { - err = fmt.Errorf("unable to parse out as JSON, %s", err) + err = fmt.Errorf("unable to parse out as JSON Array, %s", err) return nil, err } + for _, item := range jsonOut { + metrics, err = p.parseObject(metrics, item) + } + return metrics, nil +} + +func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) { tags := make(map[string]string) for k, v := range p.DefaultTags { @@ -44,7 +52,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { } f := JSONFlattener{} - err = f.FlattenJSON("", jsonOut) + err := f.FlattenJSON("", jsonOut) if err != nil { return nil, err } @@ -57,6 +65,21 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { return append(metrics, metric), nil } +func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { + + if !p.Array { + metrics := make([]telegraf.Metric, 0) + var jsonOut map[string]interface{} + err := json.Unmarshal(buf, &jsonOut) + if err != nil { + err = fmt.Errorf("unable to parse out as JSON, %s", err) + return nil, err + } + return p.parseObject(metrics, jsonOut) + } + return p.parseArray(buf) +} + func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) { metrics, err := p.Parse([]byte(line + "\n")) diff --git a/plugins/parsers/json/parser_test.go b/plugins/parsers/json/parser_test.go index f3e6d9404e990..98809ea1fc0ee 100644 --- a/plugins/parsers/json/parser_test.go +++ b/plugins/parsers/json/parser_test.go @@ -7,10 +7,12 @@ import ( ) const ( - validJSON = "{\"a\": 5, \"b\": {\"c\": 6}}" - validJSONNewline = "\n{\"d\": 7, \"b\": {\"d\": 8}}\n" - invalidJSON = "I don't think this is JSON" - invalidJSON2 = "{\"a\": 5, \"b\": \"c\": 6}}" + validJSON = "{\"a\": 5, \"b\": {\"c\": 6}}" + validJSONNewline = "\n{\"d\": 7, \"b\": {\"d\": 8}}\n" + validJSONArray = "[{\"a\": 5, \"b\": {\"c\": 6}}]" + validJSONArrayMultiple = "[{\"a\": 5, \"b\": {\"c\": 6}}, {\"a\": 7, \"b\": {\"c\": 8}}]" + invalidJSON = "I don't think this is JSON" + invalidJSON2 = "{\"a\": 5, \"b\": \"c\": 6}}" ) const validJSONTags = ` @@ -24,6 +26,27 @@ const validJSONTags = ` } ` +const validJSONArrayTags = ` +[ +{ + "a": 5, + "b": { + "c": 6 + }, + "mytag": "foo", + "othertag": "baz" +}, +{ + "a": 7, + "b": { + "c": 8 + }, + "mytag": "bar", + "othertag": "baz" +} +] +` + func TestParseValidJSON(t *testing.T) { parser := JSONParser{ MetricName: "json_test", @@ -282,3 +305,120 @@ func TestParseValidJSONDefaultTagsOverride(t *testing.T) { "mytag": "foobar", }, metrics[0].Tags()) } + +// Test that json arrays can be parsed +func TestParseValidJSONArray(t *testing.T) { + parser := JSONParser{ + MetricName: "json_array_test", + Array: true, + } + + // Most basic vanilla test + metrics, err := parser.Parse([]byte(validJSONArray)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[0].Tags()) + + // Basic multiple datapoints + metrics, err = parser.Parse([]byte(validJSONArrayMultiple)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) +} + +func TestParseArrayWithTagKeys(t *testing.T) { + // Test that strings not matching tag keys are ignored + parser := JSONParser{ + MetricName: "json_array_test", + Array: true, + TagKeys: []string{"wrongtagkey"}, + } + metrics, err := parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) + + // Test that single tag key is found and applied + parser = JSONParser{ + MetricName: "json_array_test", + Array: true, + TagKeys: []string{"mytag"}, + } + metrics, err = parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "foo", + }, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "bar", + }, metrics[1].Tags()) + + // Test that both tag keys are found and applied + parser = JSONParser{ + MetricName: "json_array_test", + Array: true, + TagKeys: []string{"mytag", "othertag"}, + } + metrics, err = parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "foo", + "othertag": "baz", + }, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "bar", + "othertag": "baz", + }, metrics[1].Tags()) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 360d795bc58f1..f0405f1fe1515 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -50,6 +50,8 @@ type Config struct { // TagKeys only apply to JSON data TagKeys []string + // Array only applies to JSON data + Array bool // MetricName applies to JSON & value. This will be the name of the measurement. MetricName string @@ -67,7 +69,7 @@ func NewParser(config *Config) (Parser, error) { switch config.DataFormat { case "json": parser, err = NewJSONParser(config.MetricName, - config.TagKeys, config.DefaultTags) + config.TagKeys, config.DefaultTags, config.Array) case "value": parser, err = NewValueParser(config.MetricName, config.DataType, config.DefaultTags) @@ -88,10 +90,12 @@ func NewJSONParser( metricName string, tagKeys []string, defaultTags map[string]string, + array bool, ) (Parser, error) { parser := &json.JSONParser{ MetricName: metricName, TagKeys: tagKeys, + Array: array, DefaultTags: defaultTags, } return parser, nil