diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 7e57d9657aae1..235e3b3088514 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -131,6 +131,12 @@ config "json_time_key" and "json_time_format". If "json_time_key" is set, "json_time_format" must be specified. The "json_time_key" describes the name of the field containing time information. The "json_time_format" must be a recognized Go time format. +If parsing a Unix epoch timestamp in seconds, e.g. 1536092344.1, this config must be set to "unix" (case insensitive); +corresponding JSON value can have a decimal part and can be a string or a number JSON representation. +If value is in number representation, it'll be treated as a double precision float, and could have some precision loss. +If value is in string representation, there'll be no precision loss up to nanosecond precision. Decimal positions beyond that will be dropped. +If parsing a Unix epoch timestamp in milliseconds, e.g. 1536092344100, this config must be set to "unix_ms" (case insensitive); +corresponding JSON value must be a (long) integer and be in number JSON representation. If there is no year provided, the metrics will have the current year. More info on time formats can be found here: https://golang.org/pkg/time/#Parse diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index 9fb0816fe040f..697296a125771 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -11,7 +11,10 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/pkg/errors" "github.com/tidwall/gjson" + "math" + "regexp" ) var ( @@ -47,6 +50,49 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { return metrics, nil } +// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part. +// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part. +func parseUnixTimestamp(jsonValue interface{}, format string) (time.Time, error) { + timeInt, timeFractional := int64(0), int64(0) + timeEpochStr, ok := jsonValue.(string) + var err error + + if !ok { + timeEpochFloat, ok := jsonValue.(float64) + if !ok { + err := fmt.Errorf("time: %v could not be converted to string nor float64", jsonValue) + return time.Time{}, err + } + intPart, frac := math.Modf(timeEpochFloat) + timeInt, timeFractional = int64(intPart), int64(frac*1e9) + } else { + splitted := regexp.MustCompile("[.,]").Split(timeEpochStr, 2) + timeInt, err = strconv.ParseInt(splitted[0], 10, 64) + if err != nil { + return time.Time{}, err + } + + if len(splitted) == 2 { + if len(splitted[1]) > 9 { + splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision + } + nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds + + timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64) + if err != nil { + return time.Time{}, err + } + } + } + if strings.EqualFold(format, "unix") { + return time.Unix(timeInt, timeFractional).UTC(), nil + } else if strings.EqualFold(format, "unix_ms") { + return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil + } else { + return time.Time{}, errors.New("Invalid unix format") + } +} + func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) { tags := make(map[string]string) @@ -78,14 +124,21 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i return nil, err } - timeStr, ok := f.Fields[p.JSONTimeKey].(string) - if !ok { - err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey]) - return nil, err - } - nTime, err = time.Parse(p.JSONTimeFormat, timeStr) - if err != nil { - return nil, err + if strings.EqualFold(p.JSONTimeFormat, "unix") || strings.EqualFold(p.JSONTimeFormat, "unix_ms") { + nTime, err = parseUnixTimestamp(f.Fields[p.JSONTimeKey], p.JSONTimeFormat) + if err != nil { + return nil, err + } + } else { + timeStr, ok := f.Fields[p.JSONTimeKey].(string) + if !ok { + err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey]) + return nil, err + } + nTime, err = time.Parse(p.JSONTimeFormat, timeStr) + if err != nil { + return nil, err + } } //if the year is 0, set to current year diff --git a/plugins/parsers/json/parser_test.go b/plugins/parsers/json/parser_test.go index 39e43bece15e2..ec9ade251ddf8 100644 --- a/plugins/parsers/json/parser_test.go +++ b/plugins/parsers/json/parser_test.go @@ -596,6 +596,72 @@ func TestTimeParser(t *testing.T) { require.Equal(t, false, metrics[0].Time() == metrics[1].Time()) } +func TestUnixTimeParser(t *testing.T) { + testString := `[ + { + "a": 5, + "b": { + "c": 6, + "time": "1536001411.1234567890" + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }, + { + "a": 7, + "b": { + "c": 8, + "time": 1536002769.123 + }, + "my_tag_1": "bar", + "my_tag_2": "baz" + } + ]` + + parser := JSONParser{ + MetricName: "json_test", + JSONTimeKey: "b_time", + JSONTimeFormat: "unix", + } + metrics, err := parser.Parse([]byte(testString)) + require.NoError(t, err) + require.Equal(t, 2, len(metrics)) + require.Equal(t, false, metrics[0].Time() == metrics[1].Time()) +} + +func TestUnixMsTimeParser(t *testing.T) { + testString := `[ + { + "a": 5, + "b": { + "c": 6, + "time": "1536001411100" + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }, + { + "a": 7, + "b": { + "c": 8, + "time": 1536002769123 + }, + "my_tag_1": "bar", + "my_tag_2": "baz" + } + ]` + + parser := JSONParser{ + MetricName: "json_test", + JSONTimeKey: "b_time", + JSONTimeFormat: "unix_ms", + } + metrics, err := parser.Parse([]byte(testString)) + require.NoError(t, err) + require.Equal(t, 2, len(metrics)) + require.Equal(t, false, metrics[0].Time() == metrics[1].Time()) +} + func TestTimeErrors(t *testing.T) { testString := `{ "a": 5,