Skip to content

Commit

Permalink
[receiver/carbon] Permit float timestamps (open-telemetry#31313)
Browse files Browse the repository at this point in the history
**Description:** 

This allows timestamps to be sent as floats.

**Link to tracking Issue:** open-telemetry#31312 

**Testing:** 

Added unit test case with float timestamp.
  • Loading branch information
jonnangle authored and XinRanZhAWS committed Mar 13, 2024
1 parent f476cb6 commit 52572c0
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 24 deletions.
27 changes: 27 additions & 0 deletions .chloggen/carbon-float-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: carbonreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Accept carbon metrics with float timestamps

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31312]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
18 changes: 13 additions & 5 deletions receiver/carbonreceiver/protocol/path_parser_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package protocol // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -104,13 +105,20 @@ func (pph *PathParserHelper) Parse(line string) (pmetric.Metric, error) {
return pmetric.Metric{}, fmt.Errorf("invalid carbon metric [%s]: %w", line, err)
}

unixTime, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
return pmetric.Metric{}, fmt.Errorf("invalid carbon metric time [%s]: %w", line, err)
var unixTimeNs int64
var dblVal float64
unixTime, errIsFloat := strconv.ParseInt(timestampStr, 10, 64)
if errIsFloat != nil {
dblVal, err = strconv.ParseFloat(timestampStr, 64)
if err != nil {
return pmetric.Metric{}, fmt.Errorf("invalid carbon metric time [%s]: %w", line, err)
}
sec, frac := math.Modf(dblVal)
unixTime = int64(sec)
unixTimeNs = int64(frac * (1e9))
}

intVal, errIsFloat := strconv.ParseInt(valueStr, 10, 64)
var dblVal float64
if errIsFloat != nil {
dblVal, err = strconv.ParseFloat(valueStr, 64)
if err != nil {
Expand All @@ -128,7 +136,7 @@ func (pph *PathParserHelper) Parse(line string) (pmetric.Metric, error) {
} else {
dp = m.SetEmptyGauge().DataPoints().AppendEmpty()
}
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(unixTime, 0)))
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(unixTime, unixTimeNs)))
if errIsFloat != nil {
dp.SetDoubleValue(dblVal)
} else {
Expand Down
75 changes: 56 additions & 19 deletions receiver/carbonreceiver/protocol/plaintext_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@ func Test_plaintextParser_Parse(t *testing.T) {
GaugeMetricType,
"tst.int",
pcommon.NewMap(),
1582230020,
time.Unix(1582230020, 0),
1,
),
},
{
line: "tst.dbl 3.14 1582230020",
want: buildDoubleMetric(
GaugeMetricType,
"tst.dbl",
nil,
1582230020,
time.Unix(1582230020, 0),
3.14,
),
},
Expand All @@ -53,17 +52,16 @@ func Test_plaintextParser_Parse(t *testing.T) {
m.PutStr("k2", "v_2")
return m
}(),
1582230020,
time.Unix(1582230020, 0),
128,
),
},
{
line: "tst.int.1tag;k0=v_0 1.23 1582230020",
want: buildDoubleMetric(
GaugeMetricType,
"tst.int.1tag",
map[string]any{"k0": "v_0"},
1582230020,
time.Unix(1582230020, 0),
1.23,
),
},
Expand Down Expand Up @@ -92,6 +90,53 @@ func Test_plaintextParser_Parse(t *testing.T) {
assert.Equal(t, tt.wantErr, err != nil)
})
}

// tests for floating point timestamps
fpTests := []struct {
line string
want pmetric.Metric
}{
{
line: "tst.floattimestamp 3.14 1582230020.1234",
want: buildDoubleMetric(
"tst.floattimestamp",
nil,
time.Unix(1582230020, 123400000),
3.14,
),
},
{
line: "tst.floattimestampnofractionalpart 3.14 1582230020.",
want: buildDoubleMetric(
"tst.floattimestampnofractionalpart",
nil,
time.Unix(1582230020, 0),
3.14,
),
},
}

for _, tt := range fpTests {
t.Run(tt.line, func(t *testing.T) {
got, err := p.Parse(tt.line)
require.NoError(t, err)

// allow for rounding difference in float conversion.
assert.WithinDuration(
t,
tt.want.Gauge().DataPoints().At(0).Timestamp().AsTime(),
got.Gauge().DataPoints().At(0).Timestamp().AsTime(),
100*time.Nanosecond,
)

// if the delta on the timestamp is OK, copy them onto the test
// object so that we can test for equality on the remaining properties.
got.Gauge().DataPoints().At(0).SetTimestamp(
tt.want.Gauge().DataPoints().At(0).Timestamp(),
)
assert.Equal(t, tt.want, got)
})
}
}

func TestPlaintextParser_parsePath(t *testing.T) {
Expand Down Expand Up @@ -175,7 +220,7 @@ func buildIntMetric(
typ TargetMetricType,
name string,
attributes pcommon.Map,
timestamp int64,
timestamp time.Time,
value int64,
) pmetric.Metric {
m := pmetric.NewMetric()
Expand All @@ -188,30 +233,22 @@ func buildIntMetric(
} else {
dp = m.SetEmptyGauge().DataPoints().AppendEmpty()
}
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(timestamp, 0)))
dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
attributes.CopyTo(dp.Attributes())
dp.SetIntValue(value)
return m
}

func buildDoubleMetric(
typ TargetMetricType,
name string,
attributes map[string]any,
timestamp int64,
timestamp time.Time,
value float64,
) pmetric.Metric {
m := pmetric.NewMetric()
m.SetName(name)
var dp pmetric.NumberDataPoint
if typ == CumulativeMetricType {
sum := m.SetEmptySum()
sum.SetIsMonotonic(true)
dp = sum.DataPoints().AppendEmpty()
} else {
dp = m.SetEmptyGauge().DataPoints().AppendEmpty()
}
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(timestamp, 0)))
var dp pmetric.NumberDataPoint = m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
_ = dp.Attributes().FromRaw(attributes)
dp.SetDoubleValue(value)
return m
Expand Down

0 comments on commit 52572c0

Please sign in to comment.