Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reset handling by checking against previous value instead of reset value. #263

Merged
merged 4 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion retrieval/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ type seriesCacheEntry struct {
suffix string
hash uint64

// Value of the most recent point seen for the time series. If a new value is
// less than the previous, then the series has reset.
previousValue float64

hasReset bool
resetValue float64
resetTimestamp int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to take the opportunity to add a blank line after this one? Or is this how the auto-formatter chose to format it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea -- added some additional comments too

Expand Down Expand Up @@ -295,19 +299,22 @@ func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, f
if !hasReset {
e.resetTimestamp = t
e.resetValue = v
e.previousValue = v
// If we just initialized the reset timestamp, this sample should be skipped.
// We don't know the window over which the current cumulative value was built up over.
// The next sample for will be considered from this point onwards.
return 0, 0, false
}
if v < e.resetValue {
if v < e.previousValue {
// If the value has dropped, there's been a reset.
// If the series was reset, set the reset timestamp to be one millisecond
// before the timestamp of the current sample.
// We don't know the true reset time but this ensures the range is non-zero
// while unlikely to conflict with any previous sample.
e.resetValue = 0
e.resetTimestamp = t - 1
}
e.previousValue = v
return e.resetTimestamp, v - e.resetValue, true
}

Expand Down
164 changes: 161 additions & 3 deletions retrieval/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSampleBuilder(t *testing.T) {
{Ref: 2, T: 2000, V: 5.5},
{Ref: 2, T: 3000, V: 8},
{Ref: 2, T: 4000, V: 9},
{Ref: 2, T: 5000, V: 3},
{Ref: 2, T: 5000, V: 7},
{Ref: 1, T: 1000, V: 200},
{Ref: 3, T: 3000, V: 1},
{Ref: 4, T: 4000, V: 2},
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestSampleBuilder(t *testing.T) {
},
}},
},
{ // 3
{ // 3: Reset series since sample's value is less than previous value.
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
Expand All @@ -201,7 +201,7 @@ func TestSampleBuilder(t *testing.T) {
EndTime: &timestamp_pb.Timestamp{Seconds: 5},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_DoubleValue{3},
Value: &monitoring_pb.TypedValue_DoubleValue{7},
},
}},
},
Expand Down Expand Up @@ -973,6 +973,164 @@ func TestSampleBuilder(t *testing.T) {
},
},
},
// Samples resulting in multiple resets for a single time series.
{
targets: targetMap{
"job1/instance1": &targets.Target{
Labels: promlabels.FromStrings("job", "job1", "instance", "instance1"),
DiscoveredLabels: promlabels.FromStrings("__resource_a", "resource2_a"),
},
},
series: seriesMap{
1: labels.FromStrings("job", "job1", "instance", "instance1", "__name__", "metric1_count"),
},
metadata: metadataMap{
"job1/instance1/metric1": &metadata.Entry{Metric: "metric1_count", MetricType: textparse.MetricTypeSummary, ValueType: metric_pb.MetricDescriptor_DOUBLE},
},
metricPrefix: "test.googleapis.com",
input: []tsdb.RefSample{
// The first result will always be nil due to reset timestamp handling.
{Ref: 1, T: 2000, V: 5}, // reset since first value; use as baseline
{Ref: 1, T: 3000, V: 8},
{Ref: 1, T: 4000, V: 9},
{Ref: 1, T: 5000, V: 8}, // reset since value dropped (8<9)
{Ref: 1, T: 6000, V: 4}, // reset since value dropped (4<8)
{Ref: 1, T: 7000, V: 12},
{Ref: 1, T: 8000, V: 1}, // reset since value dropped (1<12)
},
result: []*monitoring_pb.TimeSeries{
nil, // first sample of new series is always nil; used as reset baseline
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 2},
EndTime: &timestamp_pb.Timestamp{Seconds: 3},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{3},
},
}},
},
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 2},
EndTime: &timestamp_pb.Timestamp{Seconds: 4},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{4},
},
}},
},
// reset since value dropped (8<9)
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 4, Nanos: 1e9 - 1e6},
EndTime: &timestamp_pb.Timestamp{Seconds: 5},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{8},
},
}},
},
// reset since value dropped (4<8)
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 5, Nanos: 1e9 - 1e6},
EndTime: &timestamp_pb.Timestamp{Seconds: 6},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{4},
},
}},
},
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 5, Nanos: 1e9 - 1e6},
EndTime: &timestamp_pb.Timestamp{Seconds: 7},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{12},
},
}},
},
// reset since value dropped (1<12)
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 7, Nanos: 1e9 - 1e6},
EndTime: &timestamp_pb.Timestamp{Seconds: 8},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{1},
},
}},
},
},
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down