Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support string field glob matching in json parser #6102

Merged
merged 1 commit into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/parsers/json/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ignored unless specified in the `tag_key` or `json_string_fields` options.
"my_tag_2"
]

## String fields is an array of keys that should be added as string fields.
## Array of glob pattern strings keys that should be added as string fields.
json_string_fields = []

## Name key is the key to use as the measurement name.
Expand Down
119 changes: 72 additions & 47 deletions plugins/parsers/json/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,61 @@ import (
"strings"
"time"

"github.com/tidwall/gjson"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/tidwall/gjson"
)

var (
utf8BOM = []byte("\xef\xbb\xbf")
)

type JSONParser struct {
MetricName string
TagKeys []string
StringFields []string
JSONNameKey string
JSONQuery string
JSONTimeKey string
JSONTimeFormat string
JSONTimezone string
DefaultTags map[string]string
type Config struct {
MetricName string
TagKeys []string
NameKey string
StringFields []string
Query string
TimeKey string
TimeFormat string
Timezone string
DefaultTags map[string]string
}

type Parser struct {
metricName string
tagKeys []string
stringFields filter.Filter
nameKey string
query string
timeKey string
timeFormat string
timezone string
defaultTags map[string]string
}

func New(config *Config) (*Parser, error) {
stringFilter, err := filter.Compile(config.StringFields)
if err != nil {
return nil, err
}

return &Parser{
metricName: config.MetricName,
tagKeys: config.TagKeys,
nameKey: config.NameKey,
stringFields: stringFilter,
query: config.Query,
timeKey: config.TimeKey,
timeFormat: config.TimeFormat,
timezone: config.Timezone,
defaultTags: config.DefaultTags,
}, nil
}

func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
func (p *Parser) parseArray(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)

var jsonOut []map[string]interface{}
Expand All @@ -50,9 +81,9 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {
func (p *Parser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {
tags := make(map[string]string)
for k, v := range p.DefaultTags {
for k, v := range p.defaultTags {
tags[k] = v
}

Expand All @@ -62,33 +93,35 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
return nil, err
}

name := p.metricName

//checks if json_name_key is set
if p.JSONNameKey != "" {
switch field := f.Fields[p.JSONNameKey].(type) {
if p.nameKey != "" {
switch field := f.Fields[p.nameKey].(type) {
case string:
p.MetricName = field
name = field
}
}

//if time key is specified, set it to nTime
nTime := time.Now().UTC()
if p.JSONTimeKey != "" {
if p.JSONTimeFormat == "" {
if p.timeKey != "" {
if p.timeFormat == "" {
err := fmt.Errorf("use of 'json_time_key' requires 'json_time_format'")
return nil, err
}

if f.Fields[p.JSONTimeKey] == nil {
if f.Fields[p.timeKey] == nil {
err := fmt.Errorf("JSON time key could not be found")
return nil, err
}

nTime, err = internal.ParseTimestampWithLocation(f.Fields[p.JSONTimeKey], p.JSONTimeFormat, p.JSONTimezone)
nTime, err = internal.ParseTimestampWithLocation(f.Fields[p.timeKey], p.timeFormat, p.timezone)
if err != nil {
return nil, err
}

delete(f.Fields, p.JSONTimeKey)
delete(f.Fields, p.timeKey)

//if the year is 0, set to current year
if nTime.Year() == 0 {
Expand All @@ -97,7 +130,7 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
}

tags, nFields := p.switchFieldToTag(tags, f.Fields)
metric, err := metric.New(p.MetricName, tags, nFields, nTime)
metric, err := metric.New(name, tags, nFields, nTime)
if err != nil {
return nil, err
}
Expand All @@ -108,8 +141,8 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
//search for TagKeys that match fieldnames and add them to tags
//will delete any strings/bools that shouldn't be fields
//assumes that any non-numeric values in TagKeys should be displayed as tags
func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]interface{}) (map[string]string, map[string]interface{}) {
for _, name := range p.TagKeys {
func (p *Parser) switchFieldToTag(tags map[string]string, fields map[string]interface{}) (map[string]string, map[string]interface{}) {
for _, name := range p.tagKeys {
//switch any fields in tagkeys into tags
if fields[name] == nil {
continue
Expand All @@ -130,31 +163,23 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
}

//remove any additional string/bool values from fields
for k := range fields {
//check if field is in StringFields
sField := false
for _, v := range p.StringFields {
if v == k {
sField = true
}
}
if sField {
continue
}

switch fields[k].(type) {
for fk := range fields {
switch fields[fk].(type) {
case string:
delete(fields, k)
if p.stringFields != nil && p.stringFields.Match(fk) {
continue
}
delete(fields, fk)
case bool:
delete(fields, k)
delete(fields, fk)
}
}
return tags, fields
}

func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
if p.JSONQuery != "" {
result := gjson.GetBytes(buf, p.JSONQuery)
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
if p.query != "" {
result := gjson.GetBytes(buf, p.query)
buf = []byte(result.Raw)
if !result.IsArray() && !result.IsObject() {
err := fmt.Errorf("E! Query path must lead to a JSON object or array of objects, but lead to: %v", result.Type)
Expand All @@ -181,7 +206,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return p.parseArray(buf)
}

func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) {
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

if err != nil {
Expand All @@ -195,8 +220,8 @@ func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) {
return metrics[0], nil
}

func (p *JSONParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.defaultTags = tags
}

type JSONFlattener struct {
Expand Down
Loading