Skip to content

Commit

Permalink
Added Unix epoch timestamp support for JSON parser (influxdata#4633)
Browse files Browse the repository at this point in the history
  • Loading branch information
drenizg authored and otherpirate committed Mar 15, 2019
1 parent 279ed9a commit 21a351d
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ config "json_time_key" and "json_time_format". If "json_time_key" is set,
"json_time_format" must be specified. The "json_time_key" describes the
name of the field containing time information. The "json_time_format"
must be a recognized Go time format.
If parsing a Unix epoch timestamp in seconds, e.g. 1536092344.1, this config must be set to "unix" (case insensitive);
corresponding JSON value can have a decimal part and can be a string or a number JSON representation.
If value is in number representation, it'll be treated as a double precision float, and could have some precision loss.
If value is in string representation, there'll be no precision loss up to nanosecond precision. Decimal positions beyond that will be dropped.
If parsing a Unix epoch timestamp in milliseconds, e.g. 1536092344100, this config must be set to "unix_ms" (case insensitive);
corresponding JSON value must be a (long) integer and be in number JSON representation.
If there is no year provided, the metrics will have the current year.
More info on time formats can be found here: https://golang.org/pkg/time/#Parse

Expand Down
69 changes: 61 additions & 8 deletions plugins/parsers/json/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
"math"
"regexp"
)

var (
Expand Down Expand Up @@ -47,6 +50,49 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part.
// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part.
func parseUnixTimestamp(jsonValue interface{}, format string) (time.Time, error) {
timeInt, timeFractional := int64(0), int64(0)
timeEpochStr, ok := jsonValue.(string)
var err error

if !ok {
timeEpochFloat, ok := jsonValue.(float64)
if !ok {
err := fmt.Errorf("time: %v could not be converted to string nor float64", jsonValue)
return time.Time{}, err
}
intPart, frac := math.Modf(timeEpochFloat)
timeInt, timeFractional = int64(intPart), int64(frac*1e9)
} else {
splitted := regexp.MustCompile("[.,]").Split(timeEpochStr, 2)
timeInt, err = strconv.ParseInt(splitted[0], 10, 64)
if err != nil {
return time.Time{}, err
}

if len(splitted) == 2 {
if len(splitted[1]) > 9 {
splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision
}
nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds

timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64)
if err != nil {
return time.Time{}, err
}
}
}
if strings.EqualFold(format, "unix") {
return time.Unix(timeInt, timeFractional).UTC(), nil
} else if strings.EqualFold(format, "unix_ms") {
return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil
} else {
return time.Time{}, errors.New("Invalid unix format")
}
}

func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {

tags := make(map[string]string)
Expand Down Expand Up @@ -78,14 +124,21 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
return nil, err
}

timeStr, ok := f.Fields[p.JSONTimeKey].(string)
if !ok {
err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey])
return nil, err
}
nTime, err = time.Parse(p.JSONTimeFormat, timeStr)
if err != nil {
return nil, err
if strings.EqualFold(p.JSONTimeFormat, "unix") || strings.EqualFold(p.JSONTimeFormat, "unix_ms") {
nTime, err = parseUnixTimestamp(f.Fields[p.JSONTimeKey], p.JSONTimeFormat)
if err != nil {
return nil, err
}
} else {
timeStr, ok := f.Fields[p.JSONTimeKey].(string)
if !ok {
err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey])
return nil, err
}
nTime, err = time.Parse(p.JSONTimeFormat, timeStr)
if err != nil {
return nil, err
}
}

//if the year is 0, set to current year
Expand Down
66 changes: 66 additions & 0 deletions plugins/parsers/json/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,72 @@ func TestTimeParser(t *testing.T) {
require.Equal(t, false, metrics[0].Time() == metrics[1].Time())
}

func TestUnixTimeParser(t *testing.T) {
testString := `[
{
"a": 5,
"b": {
"c": 6,
"time": "1536001411.1234567890"
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8,
"time": 1536002769.123
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]`

parser := JSONParser{
MetricName: "json_test",
JSONTimeKey: "b_time",
JSONTimeFormat: "unix",
}
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
require.Equal(t, 2, len(metrics))
require.Equal(t, false, metrics[0].Time() == metrics[1].Time())
}

func TestUnixMsTimeParser(t *testing.T) {
testString := `[
{
"a": 5,
"b": {
"c": 6,
"time": "1536001411100"
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8,
"time": 1536002769123
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]`

parser := JSONParser{
MetricName: "json_test",
JSONTimeKey: "b_time",
JSONTimeFormat: "unix_ms",
}
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
require.Equal(t, 2, len(metrics))
require.Equal(t, false, metrics[0].Time() == metrics[1].Time())
}

func TestTimeErrors(t *testing.T) {
testString := `{
"a": 5,
Expand Down

0 comments on commit 21a351d

Please sign in to comment.