Skip to content

Commit

Permalink
Fix detection of layout timestamps (influxdata#6390)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Mathieu Lecarme committed Apr 17, 2020
1 parent f4e3810 commit a0763dc
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 73 deletions.
140 changes: 96 additions & 44 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"math/big"
"os"
"os/exec"
"regexp"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -302,62 +301,115 @@ func CompressWithGzip(data io.Reader) (io.Reader, error) {
return pipeReader, err
}

// ParseTimestamp with no location provided parses a timestamp value as UTC
func ParseTimestamp(timestamp interface{}, format string) (time.Time, error) {
return ParseTimestampWithLocation(timestamp, format, "UTC")
// ParseTimestamp parses a Time according to the standard Telegraf options.
// These are generally displayed in the toml similar to:
// json_time_key= "timestamp"
// json_time_format = "2006-01-02T15:04:05Z07:00"
// json_timezone = "America/Los_Angeles"
//
// The format can be one of "unix", "unix_ms", "unix_us", "unix_ns", or a Go
// time layout suitable for time.Parse.
//
// When using the "unix" format, a optional fractional component is allowed.
// Specific unix time precisions cannot have a fractional component.
//
// Unix times may be an int64, float64, or string. When using a Go format
// string the timestamp must be a string.
//
// The location is a location string suitable for time.LoadLocation. Unix
// times do not use the location string, a unix time is always return in the
// UTC location.
func ParseTimestamp(format string, timestamp interface{}, location string) (time.Time, error) {
switch format {
case "unix", "unix_ms", "unix_us", "unix_ns":
return parseUnix(format, timestamp)
default:
if location == "" {
location = "UTC"
}
return parseTime(format, timestamp, location)
}
}

// ParseTimestamp parses a timestamp value as a unix epoch of various precision.
//
// format = "unix": epoch is assumed to be in seconds and can come as number or string. Can have a decimal part.
// format = "unix_ms": epoch is assumed to be in milliseconds and can come as number or string. Cannot have a decimal part.
// format = "unix_us": epoch is assumed to be in microseconds and can come as number or string. Cannot have a decimal part.
// format = "unix_ns": epoch is assumed to be in nanoseconds and can come as number or string. Cannot have a decimal part.
func ParseTimestampWithLocation(timestamp interface{}, format string, location string) (time.Time, error) {
timeInt, timeFractional := int64(0), int64(0)
func parseUnix(format string, timestamp interface{}) (time.Time, error) {
integer, fractional, err := parseComponents(timestamp)
if err != nil {
return time.Unix(0, 0), err
}

switch strings.ToLower(format) {
case "unix":
return time.Unix(integer, fractional).UTC(), nil
case "unix_ms":
return time.Unix(0, integer*1e6).UTC(), nil
case "unix_us":
return time.Unix(0, integer*1e3).UTC(), nil
case "unix_ns":
return time.Unix(0, integer).UTC(), nil
default:
return time.Unix(0, 0), errors.New("unsupported type")
}
}

// Returns the integers before and after an optional decimal point. Both '.'
// and ',' are supported for the decimal point. The timestamp can be an int64,
// float64, or string.
// ex: "42.5" -> (42, 5, nil)
func parseComponents(timestamp interface{}) (int64, int64, error) {
switch ts := timestamp.(type) {
case string:
var err error
splitted := regexp.MustCompile("[.,]").Split(ts, 2)
timeInt, err = strconv.ParseInt(splitted[0], 10, 64)
if err != nil {
loc, err := time.LoadLocation(location)
if err != nil {
return time.Time{}, fmt.Errorf("location: %s could not be loaded as a location", location)
}
return time.ParseInLocation(format, ts, loc)
parts := strings.SplitN(ts, ".", 2)
if len(parts) == 2 {
return parseUnixTimeComponents(parts[0], parts[1])
}

if len(splitted) == 2 {
if len(splitted[1]) > 9 {
splitted[1] = splitted[1][:9] //truncates decimal part to nanoseconds precision
}
nanosecStr := splitted[1] + strings.Repeat("0", 9-len(splitted[1])) //adds 0's to the right to obtain a valid number of nanoseconds
parts = strings.SplitN(ts, ",", 2)
if len(parts) == 2 {
return parseUnixTimeComponents(parts[0], parts[1])
}

timeFractional, err = strconv.ParseInt(nanosecStr, 10, 64)
if err != nil {
return time.Time{}, err
}
integer, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return 0, 0, err
}
return integer, 0, nil
case int64:
timeInt = ts
return ts, 0, nil
case float64:
intPart, frac := math.Modf(ts)
timeInt, timeFractional = int64(intPart), int64(frac*1e9)
integer, fractional := math.Modf(ts)
return int64(integer), int64(fractional * 1e9), nil
default:
return time.Time{}, fmt.Errorf("time: %v could not be converted to string nor float64", timestamp)
return 0, 0, errors.New("unsupported type")
}
}

func parseUnixTimeComponents(first, second string) (int64, int64, error) {
integer, err := strconv.ParseInt(first, 10, 64)
if err != nil {
return 0, 0, err
}

if strings.EqualFold(format, "unix") {
return time.Unix(timeInt, timeFractional).UTC(), nil
} else if strings.EqualFold(format, "unix_ms") {
return time.Unix(timeInt/1000, (timeInt%1000)*1e6).UTC(), nil
} else if strings.EqualFold(format, "unix_us") {
return time.Unix(0, timeInt*1e3).UTC(), nil
} else if strings.EqualFold(format, "unix_ns") {
return time.Unix(0, timeInt).UTC(), nil
} else {
return time.Time{}, errors.New("Invalid unix format")
// Convert to nanoseconds, dropping any greater precision.
buf := []byte("000000000")
copy(buf, second)

fractional, err := strconv.ParseInt(string(buf), 10, 64)
if err != nil {
return 0, 0, err
}
return integer, fractional, nil
}

// ParseTime parses a string timestamp according to the format string.
func parseTime(format string, timestamp interface{}, location string) (time.Time, error) {
switch ts := timestamp.(type) {
case string:
loc, err := time.LoadLocation(location)
if err != nil {
return time.Unix(0, 0), err
}
return time.ParseInLocation(format, ts, loc)
default:
return time.Unix(0, 0), errors.New("unsupported type")
}
}
140 changes: 113 additions & 27 deletions internal/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,32 +331,118 @@ func TestAlignTime(t *testing.T) {
}

func TestParseTimestamp(t *testing.T) {
time, err := ParseTimestamp("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000")
assert.Nil(t, err)
assert.EqualValues(t, int64(1550699434029665000), time.UnixNano())

time, err = ParseTimestamp("2019-02-20 21:50:34.029665-04:00", "2006-01-02 15:04:05.000000-07:00")
assert.Nil(t, err)
assert.EqualValues(t, int64(1550713834029665000), time.UnixNano())

time, err = ParseTimestamp("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000-06:00")
assert.NotNil(t, err)
}

func TestParseTimestampWithLocation(t *testing.T) {
time, err := ParseTimestampWithLocation("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000", "UTC")
assert.Nil(t, err)
assert.EqualValues(t, int64(1550699434029665000), time.UnixNano())

time, err = ParseTimestampWithLocation("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000", "America/New_York")
assert.Nil(t, err)
assert.EqualValues(t, int64(1550717434029665000), time.UnixNano())

//Provided location is ignored if an offset is successfully parsed
time, err = ParseTimestampWithLocation("2019-02-20 21:50:34.029665-07:00", "2006-01-02 15:04:05.000000-07:00", "America/New_York")
assert.Nil(t, err)
assert.EqualValues(t, int64(1550724634029665000), time.UnixNano())
rfc3339 := func(value string) time.Time {
tm, err := time.Parse(time.RFC3339Nano, value)
if err != nil {
panic(err)
}
return tm
}

time, err = ParseTimestampWithLocation("2019-02-20 21:50:34.029665", "2006-01-02 15:04:05.000000", "InvalidTimeZone")
assert.NotNil(t, err)
tests := []struct {
name string
format string
timestamp interface{}
location string
expected time.Time
err bool
}{
{
name: "parse layout string in utc",
format: "2006-01-02 15:04:05",
timestamp: "2019-02-20 21:50:34",
location: "UTC",
expected: rfc3339("2019-02-20T21:50:34Z"),
},
{
name: "parse layout string with invalid timezone",
format: "2006-01-02 15:04:05",
timestamp: "2019-02-20 21:50:34",
location: "InvalidTimeZone",
err: true,
},
{
name: "layout regression 6386",
format: "02.01.2006 15:04:05",
timestamp: "09.07.2019 00:11:00",
expected: rfc3339("2019-07-09T00:11:00Z"),
},
{
name: "default location is utc",
format: "2006-01-02 15:04:05",
timestamp: "2019-02-20 21:50:34",
expected: rfc3339("2019-02-20T21:50:34Z"),
},
{
name: "unix seconds without fractional",
format: "unix",
timestamp: "1568338208",
expected: rfc3339("2019-09-13T01:30:08Z"),
},
{
name: "unix seconds with fractional",
format: "unix",
timestamp: "1568338208.500",
expected: rfc3339("2019-09-13T01:30:08.500Z"),
},
{
name: "unix seconds with fractional and comma decimal point",
format: "unix",
timestamp: "1568338208,500",
expected: rfc3339("2019-09-13T01:30:08.500Z"),
},
{
name: "unix seconds extra precision",
format: "unix",
timestamp: "1568338208.00000050042",
expected: rfc3339("2019-09-13T01:30:08.000000500Z"),
},
{
name: "unix seconds integer",
format: "unix",
timestamp: int64(1568338208),
expected: rfc3339("2019-09-13T01:30:08Z"),
},
{
name: "unix seconds float",
format: "unix",
timestamp: float64(1568338208.500),
expected: rfc3339("2019-09-13T01:30:08.500Z"),
},
{
name: "unix milliseconds",
format: "unix_ms",
timestamp: "1568338208500",
expected: rfc3339("2019-09-13T01:30:08.500Z"),
},
{
name: "unix milliseconds with fractional is ignored",
format: "unix_ms",
timestamp: "1568338208500.42",
expected: rfc3339("2019-09-13T01:30:08.500Z"),
},
{
name: "unix microseconds",
format: "unix_us",
timestamp: "1568338208000500",
expected: rfc3339("2019-09-13T01:30:08.000500Z"),
},
{
name: "unix nanoseconds",
format: "unix_ns",
timestamp: "1568338208000000500",
expected: rfc3339("2019-09-13T01:30:08.000000500Z"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tm, err := ParseTimestamp(tt.format, tt.timestamp, tt.location)
if tt.err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tt.expected, tm)
}
})
}
}
2 changes: 1 addition & 1 deletion plugins/parsers/csv/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface
case "":
return time.Time{}, fmt.Errorf("timestamp format must be specified")
default:
metricTime, err := internal.ParseTimestamp(recordFields[timestampColumn], timestampFormat)
metricTime, err := internal.ParseTimestamp(timestampFormat, recordFields[timestampColumn], "UTC")
if err != nil {
return time.Time{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/parsers/json/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (p *Parser) parseObject(data map[string]interface{}) ([]telegraf.Metric, er
return nil, err
}

nTime, err = internal.ParseTimestampWithLocation(f.Fields[p.timeKey], p.timeFormat, p.timezone)
nTime, err = internal.ParseTimestamp(p.timeFormat, f.Fields[p.timeKey], p.timezone)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a0763dc

Please sign in to comment.